package akkatest import akka.actor._ import akka.routing.RoundRobinPool import com.typesafe.config.{Config, ConfigFactory} import com.typesafe.scalalogging.slf4j.LazyLogging import iow.common.util.NetUtil import net.iponweb.kafka.util.MiscUtil import scala.util.Try object TestSystem extends LazyLogging { case object PingObj case object PongObj class ActorA extends Actor with MeasureInBatches { override def receive: Receive = { case PingObj => sender() ! PongObj measure() } } class ActorB extends Actor with MeasureInBatches { override def receive: Receive = { case _ => measure() } } def main(args: Array[String]): Unit = { val port: Int = args(0).toInt val localIpAddress: String = NetUtil.localIpAddress val commonRemotingConfig: Config = ConfigFactory.parseString( s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.port = $port |akka.remote.netty.tcp.hostname = $localIpAddress |network-dispatcher { | type = Dispatcher | executor = "fork-join-executor" | fork-join-executor { | parallelism-min = 1 | parallelism-max = 1 | } | throughput = 1000 |} |akka.remote.use-dispatcher = "network-dispatcher" |akka.remote.use-dispatcher-for-io = "network-dispatcher" |akka.remote.netty.tcp.receive-buffer-size = 1048576b |akka.remote.netty.tcp.send-buffer-size = 1048576b """.stripMargin) if (args.length == 1) { val config: Config = ConfigFactory.parseString( s"""| |server-dispatcher { | type = Dispatcher | executor = "fork-join-executor" | fork-join-executor { | parallelism-min = 1 | parallelism-max = 1 | } | throughput = 1000 |} """.stripMargin) ActorSystem("systemA", config.withFallback(commonRemotingConfig)) .actorOf(RoundRobinPool(Runtime.getRuntime.availableProcessors()) // 4 cores .props(Props(classOf[ActorA]) .withDispatcher("server-dispatcher")), "actorA") } else if (args.length >= 2) { val config: Config = ConfigFactory.parseString( s""" |client-dispatcher { | type = Dispatcher | executor = "fork-join-executor" | fork-join-executor { | parallelism-min = 1 | parallelism-max = 1 | } | throughput = 1000 |} """.stripMargin) val remoteActorPath: String = args(1) val actorSystem: ActorSystem = ActorSystem("systemB", config.withFallback(commonRemotingConfig)) val actorAsel: ActorSelection = actorSystem.actorSelection(remoteActorPath) def sendFromActor(actor: ActorRef, messages: Int): Unit = { MiscUtil.timeMs { (1 to messages) foreach { x => actorAsel.tell(PingObj, actor) } logger.info(s"Done sending: $messages messages from: $actor to: $actorAsel") } } val messages: Int = Try { args(2).toInt }.getOrElse(1000000) sendFromActor(actorSystem.actorOf(RoundRobinPool(Runtime.getRuntime.availableProcessors()).props( Props(classOf[ActorB]).withDispatcher("client-dispatcher"))), messages) } } } trait MeasureInBatches extends LazyLogging { var batchSz: Long = 10000 var cursor: Long = 0 var lastTs: Option[Long] = None def measure() = { if (cursor % batchSz == 0) { if (lastTs.isDefined) { val ms: Some[Long] = getCurrentTimeMs val tsDiff: Long = ms.get - lastTs.get val throughput: Double = batchSz.toDouble / tsDiff * 1000 logger.info(s"$this $cursor: $cursor, last batch throughput: $throughput") } lastTs = getCurrentTimeMs } cursor += 1 } def getCurrentTimeMs: Some[Long] = { Some(System.nanoTime() / 1000000) } }