-
-
Save timperrett/1193049 to your computer and use it in GitHub Desktop.
Scatter-Gather with Akka Dataflow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class Aggregator(recipients: Iterable[ActorRef]) extends Actor{ | |
| def receive = { | |
| case msg @ Message(text) => | |
| println("Started processing message `%s`" format(text)) | |
| val result = Promise[String]() | |
| val promises = List.fill(recipients.size)(Promise[String]()) | |
| recipients.zip(promises).map{case (recipient, promise) => | |
| (recipient !!! msg).map{result: String => | |
| println("Binding recipient's response: %s" format(result)) | |
| flow{ | |
| promise << result | |
| } | |
| } | |
| } | |
| flow{ | |
| def gather(promises: List[CompletableFuture[String]], result: String = ""): String @cps[Future[Any]] = | |
| promises match { | |
| case head :: tail => gather(tail, head() + result) | |
| case Nil => result | |
| } | |
| println("Binding result...") | |
| result << gather(promises) | |
| } | |
| self.reply(result) | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class Aggregator(recipients: Iterable[ActorRef]) extends Actor |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| case msg @ Message(text) => | |
| val promises = List.fill(recipients.size)(Promise[String]()) | |
| recipients.zip(promises).map{case (recipient, promise) => | |
| (recipient !!! msg).map{result: String => | |
| println("Binding recipient's response: %s" format(result)) | |
| flow{ | |
| promise << result | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| flow{ | |
| def gather(promises: List[CompletableFuture[String]], result: String = ""): String @cps[Future[Any]] = | |
| promises match { | |
| case head :: tail => gather(tail, head() + result) | |
| case Nil => result | |
| } | |
| println("Binding result...") | |
| result << gather(promises) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class Recipient(id: Int) extends Actor { | |
| def receive = { | |
| case Message(msg) => | |
| Thread.sleep(1000) | |
| self.reply("%s, [%s]! ".format(msg, id)) | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val recipients = (1 to 5).map(i => actorOf(new Recipient(i)).start) | |
| val aggregator = actorOf(new Aggregator(recipients)).start | |
| val results1 = aggregator !! Message("Hello") | |
| val results2 = aggregator !! Message("world") | |
| results1.map{ res => | |
| println("Result: %s" format(res.asInstanceOf[Future[String]].get)) | |
| } | |
| results2.map{ res => | |
| println("Result: %s" format(res.asInstanceOf[Future[String]].get)) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Started processing message `Hello` | |
| Binding result... | |
| Started processing message `world` | |
| Binding result... | |
| Binding recipient response: Hello, [3]! | |
| Binding recipient response: Hello, [1]! | |
| Binding recipient response: Hello, [4]! | |
| Binding recipient response: Hello, [2]! | |
| Binding recipient response: Hello, [5]! | |
| Result: Hello, [5]! Hello, [4]! Hello, [3]! Hello, [2]! Hello, [1]! | |
| Binding recipient response: world, [4]! | |
| Binding recipient response: world, [3]! | |
| Binding recipient response: world, [5]! | |
| Binding recipient response: world, [2]! | |
| Binding recipient response: world, [1]! | |
| Result: world, [5]! world, [4]! world, [3]! world, [2]! world, [1]! |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment