package test.akka import akka.actor.Actor.Receive import akka.actor.{Props, ActorLogging, Actor, ActorSystem} import akka.cluster.Cluster import akka.contrib.pattern.DistributedPubSubExtension import akka.contrib.pattern.DistributedPubSubMediator.{SubscribeAck, Subscribe, Publish} import com.typesafe.config.ConfigFactory /** * Created by * @author Basel Darvish */ object AkkaDistributedPubSub extends App{ val systemC1 = ActorSystem("system1", ConfigFactory.load("c1")) val systemC2 = ActorSystem("system1", ConfigFactory.load("c2")) Thread.sleep(10000)//waiting for cluster to initiate systemC1.actorOf(Props(classOf[Subscriber])) Thread.sleep(3000)//waiiting for subscriber to initiate val publisher = systemC2.actorOf(Props(classOf[Publisher])) publisher ! "Hey!" } class Subscriber extends Actor with ActorLogging { val mediator = DistributedPubSubExtension(context.system).mediator mediator ! Subscribe("t", self) log info("subscribed to: {}", "t") def receive = { case SubscribeAck(Subscribe("t", _, `self`)) ⇒ context become ready } def ready: Receive = { case s:String => log info("++++++++++++++++ Received message: {}", s) case a:Any => log info("Received unknown message: {}", a) } } class Publisher extends Actor with ActorLogging { val mediator = DistributedPubSubExtension(context.system).mediator override def receive: Receive = { case s:String => log info("________________Publishing message: {}", s) mediator ! Publish("t", s) } }