Last active
January 28, 2021 17:11
-
-
Save lloydmeta/1ee692226e07b0781b34 to your computer and use it in GitHub Desktop.
Revisions
-
lloydmeta revised this gist
Feb 28, 2016 . 1 changed file with 42 additions and 37 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -10,60 +10,63 @@ object AkkaStreamSparkIntegration { /** * Returns an InputDStream of type FlowElementType along with a Flow map element that you can use to attach to * your flow. * * Requires your system to have proper Akka remote configurations set up. * * @param flowBufferSize In the event that the InputStream is not yet ready, how many elements from the Akka stream * should be buffered before dropping oldest entries * @param actorSystem * @param streamingContext * @tparam FlowElementType * @example * * Format: OFF * {{{ * import akka.actor.ActorSystem * import akka.stream.{ActorMaterializer, ClosedShape} * import akka.stream.javadsl.{Sink, RunnableGraph} * import akka.stream.scaladsl._ * import com.beachape.sparkka._ * implicit val actSys = ActorSystem() * implicit val materializer = ActorMaterializer() * * val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => * * import GraphDSL.Implicits._ * * val source = Source(1 to 10) * * val sink = builder.add(Sink.ignore) * val bCast = builder.add(Broadcast[Int](2)) * val merge = builder.add(Merge[Int](2)) * * // InputDStream can then be used to build elements of the graph that require integration with Spark * val (inputDStream, feedDInput) = SparkIntegration.streamConnection[Int]() * * val add1 = Flow[Int].map(_ + 1) * val times3 = Flow[Int].map(_ * 3) * source ~> bCast ~> add1 ~> merge ~> sink * bCast ~> times3 ~> feedDInput ~> merge * * ClosedShape * }) * }}} * Format: ON */ def streamConnection[FlowElementType: ClassTag](actorName: String = uuid(), flowBufferSize: Int = 5000)(implicit actorSystem: ActorSystem, streamingContext: StreamingContext): (ReceiverInputDStream[FlowElementType], Flow[FlowElementType, FlowElementType, Unit]) = { val feederActor = actorSystem.actorOf(Props(new FlowShimFeeder[FlowElementType](flowBufferSize))) val remoteAddress = RemoteAddressExtension(actorSystem).address val feederActorPath = feederActor.path.toStringWithAddress(remoteAddress) val inputDStreamFromActor = streamingContext.actorStream[FlowElementType](Props(new FlowShimPublisher(feederActorPath)), actorName) val flow = Flow[FlowElementType].map { p => feederActor ! p p } (inputDStreamFromActor, flow) } // Seems rather daft to need 2 actors to do this, but otherwise we run into serialisation problems with the Akka Stream private class FlowShimFeeder[FlowElementType: ClassTag](flowBufferSize: Int) extends Actor with ActorLogging { import context.become def receive = awaitingSubscriber(Nil) @@ -97,6 +100,8 @@ val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => private sealed case class Subscribe(ref: ActorRef) private sealed case class UnSubscribe(ref: ActorRef) private def uuid() = java.util.UUID.randomUUID.toString } // Helper classes for resolving absolute actor address -
lloydmeta revised this gist
Feb 28, 2016 . 1 changed file with 16 additions and 9 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -63,33 +63,40 @@ val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => (inputDStreamFromActor, flow) } // Seems rather daft to need 2 actors to do this, but otherwise we run into serialisation problems with the Akka Stream private class FlowShimFeeder[FlowElementType: ClassTag](flowBufferSize: Int) extends Actor with ActorLogging { import context.become def receive = awaitingSubscriber(Nil) def awaitingSubscriber(toSend: Seq[FlowElementType]): Receive = { case d: FlowElementType => become(awaitingSubscriber(toSend.takeRight(flowBufferSize) :+ d)) case Subscribe(ref) => { toSend.foreach(ref ! _) become(subscribed(Seq(ref))) } case other => log.error(s"Received a random message: $other") } def subscribed(subscribers: Seq[ActorRef]): Receive = { case p: FlowElementType => subscribers.foreach(_ ! p) case Subscribe(ref) => become(subscribed(subscribers :+ ref)) case UnSubscribe(ref) => become(subscribed(subscribers.filterNot(_ == ref))) case other => log.error(s"Received a random message: $other") } } private class FlowShimPublisher[FlowElementType: ClassTag](feederAbsoluteAddress: String) extends Actor with ActorHelper { private lazy val feederActor = context.system.actorSelection(feederAbsoluteAddress) override def preStart(): Unit = feederActor ! Subscribe(self) override def postStop(): Unit = feederActor ! UnSubscribe(self) def receive = { case p: FlowElementType => store(p) case other => log.error(s"Received a random message: $other") } } private sealed case class Subscribe(ref: ActorRef) private sealed case class UnSubscribe(ref: ActorRef) } // Helper classes for resolving absolute actor address -
lloydmeta revised this gist
Feb 28, 2016 . 1 changed file with 32 additions and 31 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -16,38 +16,39 @@ object AkkaStreamSparkIntegration { * @tparam FlowElementType * @example * * * {{{ import akka.actor.ActorSystem import akka.stream.{ActorMaterializer, ClosedShape} import akka.stream.javadsl.{Sink, RunnableGraph} import akka.stream.scaladsl._ import com.beachape.akkautil.SparkIntegration implicit val actSys = ActorSystem() implicit val materializer = ActorMaterializer() val (inputDStream, feedDInput) = SparkIntegration.streamConnection[Int] val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ val source = Source(1 to 10) val sink = builder.add(Sink.ignore) val bCast = builder.add(Broadcast[Int](2)) val merge = builder.add(Merge[Int](2)) // InputDStream can then be used to build elements of the graph that require integration with Spark val (inputDStream, feedDInput) = SparkIntegration.streamConnection[Int] val add1 = Flow[Int].map(_ + 1) val times3 = Flow[Int].map(_ * 3) source ~> bCast ~> add1 ~> merge ~> sink bCast ~> times3 ~> feedDInput ~> merge ClosedShape }) * }}} */ def streamConnection[FlowElementType: ClassTag](implicit actorSystem: ActorSystem, streamingContext: StreamingContext): (ReceiverInputDStream[FlowElementType], Flow[FlowElementType, FlowElementType, Unit]) = { -
lloydmeta revised this gist
Feb 28, 2016 . 1 changed file with 2 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -6,7 +6,7 @@ import akka.actor.{ ExtensionKey, Extension, ExtendedActorSystem } import scala.reflect.ClassTag object AkkaStreamSparkIntegration { /** * Returns an InputDStream of type FlowElementType along with a Flow map element that you can use to attach to @@ -21,7 +21,7 @@ object SparkIntegration { * import akka.stream.{ActorMaterializer, ClosedShape} * import akka.stream.javadsl.{Sink, RunnableGraph} * import akka.stream.scaladsl._ * import com.beachape.akkautil.AkkaStreamSparkIntegration * implicit val actSys = ActorSystem() * implicit val materializer = ActorMaterializer() * -
lloydmeta created this gist
Feb 28, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,100 @@ import akka.actor._ import akka.stream.scaladsl.Flow import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.receiver.ActorHelper import akka.actor.{ ExtensionKey, Extension, ExtendedActorSystem } import scala.reflect.ClassTag object SparkIntegration { /** * Returns an InputDStream of type FlowElementType along with a Flow map element that you can use to attach to * your flow * * * @param actorSystem * @tparam FlowElementType * @example * * {{{ * import akka.actor.ActorSystem * import akka.stream.{ActorMaterializer, ClosedShape} * import akka.stream.javadsl.{Sink, RunnableGraph} * import akka.stream.scaladsl._ * import com.beachape.akkautil.SparkIntegration * implicit val actSys = ActorSystem() * implicit val materializer = ActorMaterializer() * * * val (inputDStream, feedDInput) = SparkIntegration.streamConnection[Int] * * val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => * * import GraphDSL.Implicits._ * * val source = Source(1 to 10) * * val sink = builder.add(Sink.ignore) * val bCast = builder.add(Broadcast[Int](2)) * val merge = builder.add(Merge[Int](2)) * * // InputDStream can then be used to build elements of the graph that require integration with Spark * val (inputDStream, feedDInput) = SparkIntegration.streamConnection[Int] * * val add1 = Flow[Int].map(_ + 1) * val times3 = Flow[Int].map(_ * 3) * source ~> bCast ~> add1 ~> merge ~> sink * bCast ~> times3 ~> feedDInput ~> merge * * ClosedShape * }) * }}} */ def streamConnection[FlowElementType: ClassTag](implicit actorSystem: ActorSystem, streamingContext: StreamingContext): (ReceiverInputDStream[FlowElementType], Flow[FlowElementType, FlowElementType, Unit]) = { val feederActor = actorSystem.actorOf(Props(new FlowShimFeeder[FlowElementType])) val remoteAddress = RemoteAddressExtension(actorSystem).address val feederActorPath = feederActor.path.toStringWithAddress(remoteAddress) val inputDStreamFromActor = streamingContext.actorStream[FlowElementType](Props(new FlowShimPublisher(feederActorPath)), "smiling-classifier-training-stream") val flow = Flow[FlowElementType].map { p => feederActor ! p p } (inputDStreamFromActor, flow) } // Seems rather daft to need 2 actors to do this, but otherwise we run into serialisation problems with the Akka Stream source private class FlowShimFeeder[FlowElementType: ClassTag] extends Actor with ActorLogging { def receive = awaitingSubscriber(Nil) def awaitingSubscriber(toSend: Seq[FlowElementType]): Receive = { case d: FlowElementType => context.become(awaitingSubscriber(toSend :+ d)) case ref: ActorRef => { toSend.foreach(ref ! _) context.become(subscribed(ref)) } case other => log.error(s"Received a random message: $other") } def subscribed(subscriber: ActorRef): Receive = { case p: FlowElementType => subscriber ! p case other => log.error(s"Received a random message: $other") } } private class FlowShimPublisher[FlowElementType: ClassTag](feederAbsoluteAddress: String) extends Actor with ActorHelper { private val feederActor = context.system.actorSelection(feederAbsoluteAddress) override def preStart(): Unit = feederActor ! self def receive = { case p: FlowElementType => store(p) case other => log.error(s"Received a random message: $other") } } } // Helper classes for resolving absolute actor address class RemoteAddressExtensionImpl(system: ExtendedActorSystem) extends Extension { def address = system.provider.getDefaultAddress } object RemoteAddressExtension extends ExtensionKey[RemoteAddressExtensionImpl]