Created
July 3, 2013 14:58
-
-
Save anonymous/5918920 to your computer and use it in GitHub Desktop.
Using Enumerator interleave to combine multiple enumerators to an interee.
This is a fixed version of; https://gist.github.com/doswell/5888270
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package controllers | |
| import play.api._ | |
| import play.api.mvc._ | |
| import scala.concurrent.duration._ | |
| import akka.actor.{Actor, Props} | |
| import play.api.libs.concurrent.Akka | |
| import play.api.libs.concurrent.Execution.Implicits._ | |
| import controllers._ | |
| import akka.pattern.ask | |
| import akka.util.Timeout | |
| import scala.concurrent._ | |
| import play.api.libs.iteratee._ | |
| import play.api.libs.concurrent._ | |
| object Application extends Controller { | |
| import play.api.Play.current | |
| implicit val timeout = Timeout(1 seconds) | |
| lazy val actorA = Akka.system.actorOf(Props(new TestActorGenerator("A")),name="actorA") | |
| lazy val actorB = Akka.system.actorOf(Props(new TestActorGenerator("B")),name="actorB") | |
| lazy val started = { | |
| val consumeA = Akka.system.actorOf(Props(new TestActorConsumer("A",actorA)),name="consumeA") | |
| val consumeB = Akka.system.actorOf(Props(new TestActorConsumer("B",actorB)),name="consumeB") | |
| val patchPanel = Akka.system.actorOf(Props(new TestActorPanelConsumer("C",actorA,actorB)),name="patchPanel") | |
| val consumeC = Akka.system.actorOf(Props(new TestActorConsumer("C",patchPanel)),name="consumeC") | |
| Akka.system.scheduler.schedule(0 seconds, 1 seconds, actorA, SendMsg()) | |
| Akka.system.scheduler.schedule(0 seconds, 1 seconds, actorB, SendMsg()) | |
| Akka.system.scheduler.schedule(0 seconds, 20 seconds, patchPanel, Add(actorA)) | |
| Akka.system.scheduler.schedule(10 seconds, 20 seconds, patchPanel, Add(actorB)) | |
| patchPanel ! Add(actorA) | |
| patchPanel ! Add(actorB) | |
| true | |
| } | |
| // val consume = Iteratee.consume[String]{case s => println(s)} | |
| def index = Action { | |
| import play.api.Play.current | |
| started | |
| Ok(views.html.index("Your new application is ready.")) | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package controllers | |
| import scala.concurrent.duration._ | |
| import scala.concurrent._ | |
| import akka.actor.{Actor, Props, ActorRef} | |
| import play.api.libs.concurrent.Akka | |
| import play.api.libs.concurrent.Execution.Implicits._ | |
| import play.api.libs.iteratee._ | |
| import play.api.libs.concurrent._ | |
| import akka.util.Timeout | |
| import akka.pattern.ask | |
| case class Join() | |
| case class SendMsg() | |
| case class Connected( | |
| enumerator: Enumerator[String] | |
| ) | |
| case class Ready() | |
| object TestActor { | |
| implicit val timeout = Timeout(1 second) | |
| import play.api.Play.current | |
| def join(actorName:String):Future[Enumerator[String]] = { | |
| val joiningActor = Akka.system.actorFor("*/actor"+actorName) | |
| (joiningActor ? Join()) map { | |
| case Connected(enumerator) => | |
| println("connectede") | |
| enumerator | |
| } | |
| } | |
| } | |
| class TestActorGenerator(actorName:String) extends Actor { | |
| val (actorEnumerator, actorChannel) = Concurrent.broadcast[String] | |
| import play.api.Play.current | |
| def receive = { | |
| case Join() => { | |
| sender ! Connected(actorEnumerator) | |
| } | |
| case SendMsg() => { | |
| actorChannel.push(actorName + " sent message ") | |
| } | |
| } | |
| } | |
| class TestActorConsumer(name:String, actorRef:ActorRef) extends Actor { | |
| import play.api.Play.current | |
| implicit val timeout = Timeout(1 seconds) | |
| val enum = (actorRef ? Join()) map { | |
| case Connected(enumerator) => | |
| println("Consumer hooked up") | |
| enumerator |>> Iteratee.foreach[String] { str => println(name + " caught " + str)} | |
| } | |
| def apply() { | |
| } | |
| def receive = { | |
| case Join() => {} | |
| } | |
| } | |
| case class Add(actor:ActorRef) | |
| class TestActorPanelConsumer(name:String, actorRefA:ActorRef, actorRefB:ActorRef) extends Actor { | |
| import play.api.Play.current | |
| import scala.collection.mutable.Seq | |
| implicit val timeout = Timeout(1 seconds) | |
| var p:Concurrent.PatchPanel[String]=null | |
| val outEnum = Concurrent.patchPanel[String]{patcher => | |
| p = patcher | |
| } | |
| var enums:Seq[Enumerator[String]] = Seq() | |
| def apply() { | |
| println (outEnum) | |
| } | |
| def receive = { | |
| case Join() => { | |
| sender ! Connected(outEnum) | |
| } | |
| case Add(actor) => { | |
| println("Adding " + actor toString) | |
| (actor ? Join()) map { | |
| case Connected(enumerator) => { | |
| enums = enums :+ enumerator | |
| p.patchIn(Enumerator.interleave[String](enums)) | |
| } | |
| } | |
| } | |
| case Connected(enumerator) => { | |
| enums = enums :+ enumerator | |
| //Combines the enumerators together, then swaps it in to the patch panel | |
| p.patchIn(Enumerator.interleave[String](enums)) | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment