package akkatest import akka.actor._ import akka.routing.RoundRobinPool import com.typesafe.config.{Config, ConfigFactory} import com.typesafe.scalalogging.slf4j.LazyLogging object TestSystem extends LazyLogging { case object PingObj case object PongObj class ActorA extends Actor with MeasureInBatches { override def receive: Receive = { /** * 2015-03-30 10:22:56,022 [INFO] - [ka.actor.default-dispatcher-19] - a.TestSystem$ActorA:154 - cursor: 360000, last batch throughput: 10152.284263959391 * 2015-03-30 10:22:56,022 [INFO] - [ka.actor.default-dispatcher-15] - a.TestSystem$ActorA:77 - cursor: 360000, last batch throughput: 10152.284263959391 */ case PingObj => sender() ! PongObj measure() } } class ActorB extends Actor with MeasureInBatches { override def receive: Receive = { /** * 2015-03-30 10:22:56,041 [INFO] - [kka.actor.default-dispatcher-2] - a.TestSystem$ActorB:77 - cursor: 360000, last batch throughput: 10131.712259371834 * 2015-03-30 10:22:56,042 [INFO] - [ka.actor.default-dispatcher-16] - a.TestSystem$ActorB:154 - cursor: 360000, last batch throughput: 10131.712259371834 */ case PongObj => measure() } } def main(args: Array[String]): Unit = { val port: Int = args(0).toInt val config: Config = ConfigFactory.parseString( s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = 127.0.0.1 |akka.remote.netty.tcp.port = $port """.stripMargin) if (args.length == 1) { val actorSystem: ActorSystem = ActorSystem("systemA", config) val actorA: ActorRef = actorSystem.actorOf(RoundRobinPool(2).props(Props(classOf[ActorA])), "actorA") } else if (args.length == 2) { val remotePort: Int = args(1).toInt val actorSystem: ActorSystem = ActorSystem("systemB", config) val actorAsel: ActorSelection = actorSystem.actorSelection(s"akka.tcp://systemA@127.0.0.1:$remotePort/user/actorA") def sendFromActor(actor: ActorRef, messages: Int = 1000000): Unit = { (1 to messages) foreach { x => actorAsel.tell(PingObj, actor) } } sendFromActor(actorSystem.actorOf(RoundRobinPool(2).props(Props(classOf[ActorB])))) // sendFromActor(actorSystem.actorOf(Props(classOf[ActorB]))) } } } trait MeasureInBatches extends LazyLogging { var batchSz: Long = 10000 var cursor: Long = 0 var lastTs: Option[Long] = None def measure() = { if (cursor % batchSz == 0) { if (lastTs.isDefined) { val ms: Some[Long] = getCurrentTimeMs val tsDiff: Long = ms.get - lastTs.get val throughput: Double = batchSz.toDouble / tsDiff * 1000 logger.info(s"cursor: $cursor, last batch throughput: $throughput") } lastTs = getCurrentTimeMs } cursor += 1 } def getCurrentTimeMs: Some[Long] = { Some(System.nanoTime() / 1000000) } }