Created
January 27, 2016 17:39
-
-
Save andreionut/98ff613e7bb1b20135eb to your computer and use it in GitHub Desktop.
Revisions
-
andreionut created this gist
Jan 27, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,83 @@ import _root_.kafka.serializer.{StringDecoder, StringEncoder} import akka.actor.SupervisorStrategy.{Restart, Resume} import akka.actor.{Props, OneForOneStrategy, SupervisorStrategy, ActorSystem} import akka.stream.actor.ActorSubscriber import akka.stream.{ActorMaterializerSettings, Supervision, ActorMaterializer} import akka.stream.scaladsl.{Sink, Source} import com.softwaremill.react.kafka.KafkaMessages.StringKafkaMessage import com.softwaremill.react.kafka.{ConsumerProperties, ProducerProperties, ReactiveKafka} import org.reactivestreams.{Publisher, Subscriber} import scala.language.postfixOps object Uppercase extends App { implicit val actorSystem = ActorSystem("ReactiveKafka") val decider: Supervision.Decider = { case e: Throwable => println("Stream Supervision Decider to the rescue!!! (case Throwable)") Supervision.Restart case _ => println("Stream Supervision Decider to the rescue!!! (case _)") Supervision.Restart } implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)) val kafka = new ReactiveKafka() val publisher: Publisher[StringKafkaMessage] = kafka.consume(ConsumerProperties( brokerList = "192.168.42.45:9092", zooKeeperHost = "192.168.42.45:2181/kafka", topic = "lowercaseStrings", groupId = "groupName", decoder = new StringDecoder() )) // val subscriber: Subscriber[String] = kafka.publish(ProducerProperties( // brokerList = "192.168.42.45:9092", // topic = "uppercaseStrings", // encoder = new StringEncoder() // )) val subscriber: Subscriber[String] = ??? Source.fromPublisher(publisher).map{ m => Thread.sleep(50) if (m.message() == "b") throw new IllegalArgumentException("b encountered. We can't have that!") println(m.message()) m.message().toUpperCase }.to(Sink.fromSubscriber(subscriber)).run() } import akka.actor.{Actor, ActorRef, ActorSystem, Props} import akka.stream.ActorMaterializer import com.softwaremill.react.kafka.{ConsumerProperties, ProducerProperties, ReactiveKafka} class Handler extends Actor { implicit val materializer = ActorMaterializer() override val supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { case exception => println("Actor Supervision Strategy to the rescue!!! (case exception)") Restart // Your custom error handling } def createSupervisedSubscriberActor() = { val kafka = new ReactiveKafka() // subscriber val subscriberProperties = ProducerProperties( brokerList = "192.168.42.45:9092", topic = "uppercaseStrings", encoder = new StringEncoder() ) val subscriberActorProps: Props = kafka.producerActorProps(subscriberProperties) context.actorOf(subscriberActorProps) } override def receive: Receive = { case actor => println("Stuff") } }