Created
October 24, 2015 07:23
-
-
Save trautonen/a2e65f9893ef0b662f0b to your computer and use it in GitHub Desktop.
Revisions
-
trautonen created this gist
Oct 24, 2015 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,59 @@ package flows import akka.actor.ActorRef import akka.pattern.ask import akka.stream.ActorMaterializer import akka.stream.scaladsl.RunnableGraph import akka.stream.stage._ import akka.util.Timeout import flows.DetourStage.AwaitCompletion import scala.concurrent.ExecutionContext import scala.util.{Failure, Success, Try} object DetourStage { case object AwaitCompletion } class DetourStage[In, Out](g: RunnableGraph[(ActorRef, ActorRef)], timeout: Timeout) (implicit materializer: ActorMaterializer, ec: ExecutionContext) extends AsyncStage[In, Out, Try[Out]] { private var inFlight: Option[Out] = None override def onPush(elem: In, ctx: AsyncContext[Out, Try[Out]]): UpstreamDirective = { val (source, sink) = g.run() val future = ask(sink, AwaitCompletion)(timeout).map(_.asInstanceOf[Out]) val callback = ctx.getAsyncCallback() future.onComplete(callback.invoke) source ! elem ctx.holdUpstream() } override def onPull(ctx: AsyncContext[Out, Try[Out]]): DownstreamDirective = inFlight match { case Some(elem) => inFlight = None push(elem, ctx) case None => ctx.holdDownstream() } override def onAsyncInput(event: Try[Out], ctx: AsyncContext[Out, Try[Out]]): Directive = event match { case Failure(ex) => ctx.fail(ex) case Success(elem) if ctx.isHoldingDownstream => push(elem, ctx) case Success(elem) => inFlight = Some(elem) ctx.ignore() } override def onUpstreamFinish(ctx: AsyncContext[Out, Try[Out]]): TerminationDirective = { if (ctx.isHoldingUpstream) ctx.absorbTermination() else ctx.finish() } private def push(elem: Out, ctx: AsyncContext[Out, Try[Out]]): DownstreamDirective = { if (ctx.isFinishing) ctx.pushAndFinish(elem) else ctx.pushAndPull(elem) } } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,26 @@ Trigger a 'detour' graph from Akka stream ========================================= Given the idea of the flow: ``` infinite stream | | v trigger --> runnable graph (ActorRef, ActorRef) --> ... | v join | <-- onComplete <-- | v ``` The first idea how to implement the missing block of handling the trigger and join stage in a stream was to use AsyncStage and actor ask pattern. Using https://github.com/akka/akka/blob/releasing-akka-stream-and-http-experimental-1.0/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala as a reference. Not sure if this is the ideal way of implementing the stage though. It lacks type safety and introduced annoying type casting.