Skip to content

Instantly share code, notes, and snippets.

@trautonen
Created October 24, 2015 07:23
Show Gist options
  • Select an option

  • Save trautonen/a2e65f9893ef0b662f0b to your computer and use it in GitHub Desktop.

Select an option

Save trautonen/a2e65f9893ef0b662f0b to your computer and use it in GitHub Desktop.

Revisions

  1. trautonen created this gist Oct 24, 2015.
    59 changes: 59 additions & 0 deletions DetourStage.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,59 @@
    package flows

    import akka.actor.ActorRef
    import akka.pattern.ask
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.RunnableGraph
    import akka.stream.stage._
    import akka.util.Timeout
    import flows.DetourStage.AwaitCompletion

    import scala.concurrent.ExecutionContext
    import scala.util.{Failure, Success, Try}

    object DetourStage {
    case object AwaitCompletion
    }

    class DetourStage[In, Out](g: RunnableGraph[(ActorRef, ActorRef)], timeout: Timeout)
    (implicit materializer: ActorMaterializer, ec: ExecutionContext) extends AsyncStage[In, Out, Try[Out]] {

    private var inFlight: Option[Out] = None

    override def onPush(elem: In, ctx: AsyncContext[Out, Try[Out]]): UpstreamDirective = {
    val (source, sink) = g.run()
    val future = ask(sink, AwaitCompletion)(timeout).map(_.asInstanceOf[Out])
    val callback = ctx.getAsyncCallback()
    future.onComplete(callback.invoke)
    source ! elem
    ctx.holdUpstream()
    }

    override def onPull(ctx: AsyncContext[Out, Try[Out]]): DownstreamDirective = inFlight match {
    case Some(elem) =>
    inFlight = None
    push(elem, ctx)
    case None =>
    ctx.holdDownstream()
    }

    override def onAsyncInput(event: Try[Out], ctx: AsyncContext[Out, Try[Out]]): Directive = event match {
    case Failure(ex) =>
    ctx.fail(ex)
    case Success(elem) if ctx.isHoldingDownstream =>
    push(elem, ctx)
    case Success(elem) =>
    inFlight = Some(elem)
    ctx.ignore()
    }

    override def onUpstreamFinish(ctx: AsyncContext[Out, Try[Out]]): TerminationDirective = {
    if (ctx.isHoldingUpstream) ctx.absorbTermination()
    else ctx.finish()
    }

    private def push(elem: Out, ctx: AsyncContext[Out, Try[Out]]): DownstreamDirective = {
    if (ctx.isFinishing) ctx.pushAndFinish(elem)
    else ctx.pushAndPull(elem)
    }
    }
    26 changes: 26 additions & 0 deletions detour.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,26 @@
    Trigger a 'detour' graph from Akka stream
    =========================================

    Given the idea of the flow:
    ```
    infinite
    stream
    |
    |
    v
    trigger --> runnable graph (ActorRef, ActorRef) --> ...
    |
    v
    join | <-- onComplete <--
    |
    v
    ```

    The first idea how to implement the missing block of handling the trigger and join
    stage in a stream was to use AsyncStage and actor ask pattern. Using
    https://github.com/akka/akka/blob/releasing-akka-stream-and-http-experimental-1.0/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala
    as a reference.

    Not sure if this is the ideal way of implementing the stage though. It lacks type safety
    and introduced annoying type casting.