Skip to content

Instantly share code, notes, and snippets.

@andreionut
Created January 27, 2016 17:39
Show Gist options
  • Select an option

  • Save andreionut/98ff613e7bb1b20135eb to your computer and use it in GitHub Desktop.

Select an option

Save andreionut/98ff613e7bb1b20135eb to your computer and use it in GitHub Desktop.

Revisions

  1. andreionut created this gist Jan 27, 2016.
    83 changes: 83 additions & 0 deletions Uppercase.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,83 @@
    import _root_.kafka.serializer.{StringDecoder, StringEncoder}
    import akka.actor.SupervisorStrategy.{Restart, Resume}
    import akka.actor.{Props, OneForOneStrategy, SupervisorStrategy, ActorSystem}
    import akka.stream.actor.ActorSubscriber
    import akka.stream.{ActorMaterializerSettings, Supervision, ActorMaterializer}
    import akka.stream.scaladsl.{Sink, Source}
    import com.softwaremill.react.kafka.KafkaMessages.StringKafkaMessage
    import com.softwaremill.react.kafka.{ConsumerProperties, ProducerProperties, ReactiveKafka}
    import org.reactivestreams.{Publisher, Subscriber}

    import scala.language.postfixOps

    object Uppercase extends App {

    implicit val actorSystem = ActorSystem("ReactiveKafka")
    val decider: Supervision.Decider = {
    case e: Throwable =>
    println("Stream Supervision Decider to the rescue!!! (case Throwable)")
    Supervision.Restart
    case _ =>
    println("Stream Supervision Decider to the rescue!!! (case _)")
    Supervision.Restart
    }

    implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))

    val kafka = new ReactiveKafka()
    val publisher: Publisher[StringKafkaMessage] = kafka.consume(ConsumerProperties(
    brokerList = "192.168.42.45:9092",
    zooKeeperHost = "192.168.42.45:2181/kafka",
    topic = "lowercaseStrings",
    groupId = "groupName",
    decoder = new StringDecoder()
    ))

    // val subscriber: Subscriber[String] = kafka.publish(ProducerProperties(
    // brokerList = "192.168.42.45:9092",
    // topic = "uppercaseStrings",
    // encoder = new StringEncoder()
    // ))

    val subscriber: Subscriber[String] = ???

    Source.fromPublisher(publisher).map{ m =>
    Thread.sleep(50)
    if (m.message() == "b") throw new IllegalArgumentException("b encountered. We can't have that!")
    println(m.message())
    m.message().toUpperCase
    }.to(Sink.fromSubscriber(subscriber)).run()

    }


    import akka.actor.{Actor, ActorRef, ActorSystem, Props}
    import akka.stream.ActorMaterializer
    import com.softwaremill.react.kafka.{ConsumerProperties, ProducerProperties, ReactiveKafka}

    class Handler extends Actor {
    implicit val materializer = ActorMaterializer()

    override val supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
    case exception =>
    println("Actor Supervision Strategy to the rescue!!! (case exception)")
    Restart // Your custom error handling
    }

    def createSupervisedSubscriberActor() = {
    val kafka = new ReactiveKafka()

    // subscriber
    val subscriberProperties = ProducerProperties(
    brokerList = "192.168.42.45:9092",
    topic = "uppercaseStrings",
    encoder = new StringEncoder()
    )
    val subscriberActorProps: Props = kafka.producerActorProps(subscriberProperties)
    context.actorOf(subscriberActorProps)
    }

    override def receive: Receive = {
    case actor => println("Stuff")
    }
    }