Skip to content

Instantly share code, notes, and snippets.

@lloydmeta
Last active January 28, 2021 17:11
Show Gist options
  • Save lloydmeta/1ee692226e07b0781b34 to your computer and use it in GitHub Desktop.
Save lloydmeta/1ee692226e07b0781b34 to your computer and use it in GitHub Desktop.

Revisions

  1. lloydmeta revised this gist Feb 28, 2016. 1 changed file with 42 additions and 37 deletions.
    79 changes: 42 additions & 37 deletions AkkaStreamSparkIntegration.scala
    Original file line number Diff line number Diff line change
    @@ -10,60 +10,63 @@ object AkkaStreamSparkIntegration {

    /**
    * Returns an InputDStream of type FlowElementType along with a Flow map element that you can use to attach to
    * your flow *
    * your flow.
    *
    * Requires your system to have proper Akka remote configurations set up.
    *
    * @param flowBufferSize In the event that the InputStream is not yet ready, how many elements from the Akka stream
    * should be buffered before dropping oldest entries
    * @param actorSystem
    * @param streamingContext
    * @tparam FlowElementType
    * @example
    *
    *
    * Format: OFF
    * {{{
    import akka.actor.ActorSystem
    import akka.stream.{ActorMaterializer, ClosedShape}
    import akka.stream.javadsl.{Sink, RunnableGraph}
    import akka.stream.scaladsl._
    import com.beachape.akkautil.SparkIntegration
    implicit val actSys = ActorSystem()
    implicit val materializer = ActorMaterializer()
    val (inputDStream, feedDInput) = SparkIntegration.streamConnection[Int]
    val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val source = Source(1 to 10)
    val sink = builder.add(Sink.ignore)
    val bCast = builder.add(Broadcast[Int](2))
    val merge = builder.add(Merge[Int](2))
    // InputDStream can then be used to build elements of the graph that require integration with Spark
    val (inputDStream, feedDInput) = SparkIntegration.streamConnection[Int]
    val add1 = Flow[Int].map(_ + 1)
    val times3 = Flow[Int].map(_ * 3)
    source ~> bCast ~> add1 ~> merge ~> sink
    bCast ~> times3 ~> feedDInput ~> merge
    ClosedShape
    })
    * import akka.actor.ActorSystem
    * import akka.stream.{ActorMaterializer, ClosedShape}
    * import akka.stream.javadsl.{Sink, RunnableGraph}
    * import akka.stream.scaladsl._
    * import com.beachape.sparkka._
    * implicit val actSys = ActorSystem()
    * implicit val materializer = ActorMaterializer()
    *
    * val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    *
    * import GraphDSL.Implicits._
    *
    * val source = Source(1 to 10)
    *
    * val sink = builder.add(Sink.ignore)
    * val bCast = builder.add(Broadcast[Int](2))
    * val merge = builder.add(Merge[Int](2))
    *
    * // InputDStream can then be used to build elements of the graph that require integration with Spark
    * val (inputDStream, feedDInput) = SparkIntegration.streamConnection[Int]()
    *
    * val add1 = Flow[Int].map(_ + 1)
    * val times3 = Flow[Int].map(_ * 3)
    * source ~> bCast ~> add1 ~> merge ~> sink
    * bCast ~> times3 ~> feedDInput ~> merge
    *
    * ClosedShape
    * })
    * }}}
    * Format: ON
    */
    def streamConnection[FlowElementType: ClassTag](implicit actorSystem: ActorSystem, streamingContext: StreamingContext): (ReceiverInputDStream[FlowElementType], Flow[FlowElementType, FlowElementType, Unit]) = {
    val feederActor = actorSystem.actorOf(Props(new FlowShimFeeder[FlowElementType]))
    def streamConnection[FlowElementType: ClassTag](actorName: String = uuid(), flowBufferSize: Int = 5000)(implicit actorSystem: ActorSystem, streamingContext: StreamingContext): (ReceiverInputDStream[FlowElementType], Flow[FlowElementType, FlowElementType, Unit]) = {
    val feederActor = actorSystem.actorOf(Props(new FlowShimFeeder[FlowElementType](flowBufferSize)))
    val remoteAddress = RemoteAddressExtension(actorSystem).address
    val feederActorPath = feederActor.path.toStringWithAddress(remoteAddress)
    val inputDStreamFromActor = streamingContext.actorStream[FlowElementType](Props(new FlowShimPublisher(feederActorPath)), "smiling-classifier-training-stream")
    val inputDStreamFromActor = streamingContext.actorStream[FlowElementType](Props(new FlowShimPublisher(feederActorPath)), actorName)
    val flow = Flow[FlowElementType].map { p =>
    feederActor ! p
    p
    }
    (inputDStreamFromActor, flow)
    }

    // Seems rather daft to need 2 actors to do this, but otherwise we run into serialisation problems with the Akka Stream
    // Seems rather daft to need 2 actors to do this, but otherwise we run into serialisation problems with the Akka Stream
    private class FlowShimFeeder[FlowElementType: ClassTag](flowBufferSize: Int) extends Actor with ActorLogging {
    import context.become
    def receive = awaitingSubscriber(Nil)
    @@ -97,6 +100,8 @@ val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    private sealed case class Subscribe(ref: ActorRef)
    private sealed case class UnSubscribe(ref: ActorRef)

    private def uuid() = java.util.UUID.randomUUID.toString

    }

    // Helper classes for resolving absolute actor address
  2. lloydmeta revised this gist Feb 28, 2016. 1 changed file with 16 additions and 9 deletions.
    25 changes: 16 additions & 9 deletions AkkaStreamSparkIntegration.scala
    Original file line number Diff line number Diff line change
    @@ -63,33 +63,40 @@ val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    (inputDStreamFromActor, flow)
    }

    // Seems rather daft to need 2 actors to do this, but otherwise we run into serialisation problems with the Akka Stream source
    private class FlowShimFeeder[FlowElementType: ClassTag] extends Actor with ActorLogging {
    // Seems rather daft to need 2 actors to do this, but otherwise we run into serialisation problems with the Akka Stream
    private class FlowShimFeeder[FlowElementType: ClassTag](flowBufferSize: Int) extends Actor with ActorLogging {
    import context.become
    def receive = awaitingSubscriber(Nil)
    def awaitingSubscriber(toSend: Seq[FlowElementType]): Receive = {
    case d: FlowElementType => context.become(awaitingSubscriber(toSend :+ d))
    case ref: ActorRef => {
    case d: FlowElementType => become(awaitingSubscriber(toSend.takeRight(flowBufferSize) :+ d))
    case Subscribe(ref) => {
    toSend.foreach(ref ! _)
    context.become(subscribed(ref))
    become(subscribed(Seq(ref)))
    }
    case other => log.error(s"Received a random message: $other")
    }

    def subscribed(subscriber: ActorRef): Receive = {
    case p: FlowElementType => subscriber ! p
    def subscribed(subscribers: Seq[ActorRef]): Receive = {
    case p: FlowElementType => subscribers.foreach(_ ! p)
    case Subscribe(ref) => become(subscribed(subscribers :+ ref))
    case UnSubscribe(ref) => become(subscribed(subscribers.filterNot(_ == ref)))
    case other => log.error(s"Received a random message: $other")
    }
    }
    private class FlowShimPublisher[FlowElementType: ClassTag](feederAbsoluteAddress: String) extends Actor with ActorHelper {
    private val feederActor = context.system.actorSelection(feederAbsoluteAddress)
    override def preStart(): Unit = feederActor ! self
    private lazy val feederActor = context.system.actorSelection(feederAbsoluteAddress)
    override def preStart(): Unit = feederActor ! Subscribe(self)
    override def postStop(): Unit = feederActor ! UnSubscribe(self)

    def receive = {
    case p: FlowElementType => store(p)
    case other => log.error(s"Received a random message: $other")
    }
    }

    private sealed case class Subscribe(ref: ActorRef)
    private sealed case class UnSubscribe(ref: ActorRef)

    }

    // Helper classes for resolving absolute actor address
  3. lloydmeta revised this gist Feb 28, 2016. 1 changed file with 32 additions and 31 deletions.
    63 changes: 32 additions & 31 deletions AkkaStreamSparkIntegration.scala
    Original file line number Diff line number Diff line change
    @@ -16,38 +16,39 @@ object AkkaStreamSparkIntegration {
    * @tparam FlowElementType
    * @example
    *
    * {{{
    * import akka.actor.ActorSystem
    * import akka.stream.{ActorMaterializer, ClosedShape}
    * import akka.stream.javadsl.{Sink, RunnableGraph}
    * import akka.stream.scaladsl._
    * import com.beachape.akkautil.AkkaStreamSparkIntegration
    * implicit val actSys = ActorSystem()
    * implicit val materializer = ActorMaterializer()
    *
    *
    * val (inputDStream, feedDInput) = SparkIntegration.streamConnection[Int]
    *
    * val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    *
    * import GraphDSL.Implicits._
    *
    * val source = Source(1 to 10)
    *
    * val sink = builder.add(Sink.ignore)
    * val bCast = builder.add(Broadcast[Int](2))
    * val merge = builder.add(Merge[Int](2))
    *
    * // InputDStream can then be used to build elements of the graph that require integration with Spark
    * val (inputDStream, feedDInput) = SparkIntegration.streamConnection[Int]
    *
    * val add1 = Flow[Int].map(_ + 1)
    * val times3 = Flow[Int].map(_ * 3)
    * source ~> bCast ~> add1 ~> merge ~> sink
    * bCast ~> times3 ~> feedDInput ~> merge
    *
    * ClosedShape
    * })
    * {{{
    import akka.actor.ActorSystem
    import akka.stream.{ActorMaterializer, ClosedShape}
    import akka.stream.javadsl.{Sink, RunnableGraph}
    import akka.stream.scaladsl._
    import com.beachape.akkautil.SparkIntegration
    implicit val actSys = ActorSystem()
    implicit val materializer = ActorMaterializer()
    val (inputDStream, feedDInput) = SparkIntegration.streamConnection[Int]
    val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val source = Source(1 to 10)
    val sink = builder.add(Sink.ignore)
    val bCast = builder.add(Broadcast[Int](2))
    val merge = builder.add(Merge[Int](2))
    // InputDStream can then be used to build elements of the graph that require integration with Spark
    val (inputDStream, feedDInput) = SparkIntegration.streamConnection[Int]
    val add1 = Flow[Int].map(_ + 1)
    val times3 = Flow[Int].map(_ * 3)
    source ~> bCast ~> add1 ~> merge ~> sink
    bCast ~> times3 ~> feedDInput ~> merge
    ClosedShape
    })
    * }}}
    */
    def streamConnection[FlowElementType: ClassTag](implicit actorSystem: ActorSystem, streamingContext: StreamingContext): (ReceiverInputDStream[FlowElementType], Flow[FlowElementType, FlowElementType, Unit]) = {
  4. lloydmeta revised this gist Feb 28, 2016. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions AkkaStreamSparkIntegration.scala
    Original file line number Diff line number Diff line change
    @@ -6,7 +6,7 @@ import akka.actor.{ ExtensionKey, Extension, ExtendedActorSystem }

    import scala.reflect.ClassTag

    object SparkIntegration {
    object AkkaStreamSparkIntegration {

    /**
    * Returns an InputDStream of type FlowElementType along with a Flow map element that you can use to attach to
    @@ -21,7 +21,7 @@ object SparkIntegration {
    * import akka.stream.{ActorMaterializer, ClosedShape}
    * import akka.stream.javadsl.{Sink, RunnableGraph}
    * import akka.stream.scaladsl._
    * import com.beachape.akkautil.SparkIntegration
    * import com.beachape.akkautil.AkkaStreamSparkIntegration
    * implicit val actSys = ActorSystem()
    * implicit val materializer = ActorMaterializer()
    *
  5. lloydmeta created this gist Feb 28, 2016.
    100 changes: 100 additions & 0 deletions AkkaStreamSparkIntegration.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,100 @@
    import akka.actor._
    import akka.stream.scaladsl.Flow
    import org.apache.spark.streaming.dstream.ReceiverInputDStream
    import org.apache.spark.streaming.receiver.ActorHelper
    import akka.actor.{ ExtensionKey, Extension, ExtendedActorSystem }

    import scala.reflect.ClassTag

    object SparkIntegration {

    /**
    * Returns an InputDStream of type FlowElementType along with a Flow map element that you can use to attach to
    * your flow *
    *
    * @param actorSystem
    * @tparam FlowElementType
    * @example
    *
    * {{{
    * import akka.actor.ActorSystem
    * import akka.stream.{ActorMaterializer, ClosedShape}
    * import akka.stream.javadsl.{Sink, RunnableGraph}
    * import akka.stream.scaladsl._
    * import com.beachape.akkautil.SparkIntegration
    * implicit val actSys = ActorSystem()
    * implicit val materializer = ActorMaterializer()
    *
    *
    * val (inputDStream, feedDInput) = SparkIntegration.streamConnection[Int]
    *
    * val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    *
    * import GraphDSL.Implicits._
    *
    * val source = Source(1 to 10)
    *
    * val sink = builder.add(Sink.ignore)
    * val bCast = builder.add(Broadcast[Int](2))
    * val merge = builder.add(Merge[Int](2))
    *
    * // InputDStream can then be used to build elements of the graph that require integration with Spark
    * val (inputDStream, feedDInput) = SparkIntegration.streamConnection[Int]
    *
    * val add1 = Flow[Int].map(_ + 1)
    * val times3 = Flow[Int].map(_ * 3)
    * source ~> bCast ~> add1 ~> merge ~> sink
    * bCast ~> times3 ~> feedDInput ~> merge
    *
    * ClosedShape
    * })
    * }}}
    */
    def streamConnection[FlowElementType: ClassTag](implicit actorSystem: ActorSystem, streamingContext: StreamingContext): (ReceiverInputDStream[FlowElementType], Flow[FlowElementType, FlowElementType, Unit]) = {
    val feederActor = actorSystem.actorOf(Props(new FlowShimFeeder[FlowElementType]))
    val remoteAddress = RemoteAddressExtension(actorSystem).address
    val feederActorPath = feederActor.path.toStringWithAddress(remoteAddress)
    val inputDStreamFromActor = streamingContext.actorStream[FlowElementType](Props(new FlowShimPublisher(feederActorPath)), "smiling-classifier-training-stream")
    val flow = Flow[FlowElementType].map { p =>
    feederActor ! p
    p
    }
    (inputDStreamFromActor, flow)
    }

    // Seems rather daft to need 2 actors to do this, but otherwise we run into serialisation problems with the Akka Stream source
    private class FlowShimFeeder[FlowElementType: ClassTag] extends Actor with ActorLogging {
    def receive = awaitingSubscriber(Nil)
    def awaitingSubscriber(toSend: Seq[FlowElementType]): Receive = {
    case d: FlowElementType => context.become(awaitingSubscriber(toSend :+ d))
    case ref: ActorRef => {
    toSend.foreach(ref ! _)
    context.become(subscribed(ref))
    }
    case other => log.error(s"Received a random message: $other")
    }

    def subscribed(subscriber: ActorRef): Receive = {
    case p: FlowElementType => subscriber ! p
    case other => log.error(s"Received a random message: $other")
    }
    }
    private class FlowShimPublisher[FlowElementType: ClassTag](feederAbsoluteAddress: String) extends Actor with ActorHelper {
    private val feederActor = context.system.actorSelection(feederAbsoluteAddress)
    override def preStart(): Unit = feederActor ! self

    def receive = {
    case p: FlowElementType => store(p)
    case other => log.error(s"Received a random message: $other")
    }
    }

    }

    // Helper classes for resolving absolute actor address

    class RemoteAddressExtensionImpl(system: ExtendedActorSystem) extends Extension {
    def address = system.provider.getDefaultAddress
    }

    object RemoteAddressExtension extends ExtensionKey[RemoteAddressExtensionImpl]