import java.util.concurrent._ import akka.dispatch.{Dispatcher, ExecutorServiceDelegate} import config.Config import helpers.ScalaLogger class ExecutionContextMonitor()(implicit metricsService: MetricsClient, config: Config) { private val log = ScalaLogger.get(this.getClass) private val scheduler = Executors.newSingleThreadScheduledExecutor() // Reflection to access Scala protected method private val ecGetterForAkkaDispatcher = classOf[Dispatcher].getDeclaredMethod("executorService") ecGetterForAkkaDispatcher.setAccessible(true) def monitor(ec: Executor, ecName: String): Unit = ec match { case threadPoolExecutor: ThreadPoolExecutor => log.info(s"Monitoring ec $ecName of type ThreadPoolExecutor") val runnable = new Runnable { override def run(): Unit = setMetricsForThreadPoolExecutor(ecName, threadPoolExecutor) } scheduler.scheduleAtFixedRate(runnable, interval.toMillis, interval.toMillis, TimeUnit.MILLISECONDS) case forkJoinPool: ForkJoinPool => log.info(s"Monitoring ec $ecName of type ForkJoinPool") val runnable = new Runnable { override def run(): Unit = setMetricsForForkJoinPool(ecName, forkJoinPool) } scheduler.scheduleAtFixedRate(runnable, interval.toMillis, interval.toMillis, TimeUnit.MILLISECONDS) case dispatcher: Dispatcher => log.info(s"Monitoring ec $ecName of type akka.dispatch.Dispatcher") val runnable = new Runnable { override def run(): Unit = { val ec = ecGetterForAkkaDispatcher.invoke(dispatcher).asInstanceOf[ExecutorServiceDelegate].executor ec match { case threadPoolExecutor: ThreadPoolExecutor => setMetricsForThreadPoolExecutor(ecName, threadPoolExecutor) case forkJoinPool: ForkJoinPool => setMetricsForForkJoinPool(ecName, forkJoinPool) case _ => log.warn(s"Can not set metrics for ect of type ${ec.getClass} from akka dispatcher $dispatcher") } } } scheduler.scheduleAtFixedRate(runnable, interval.toMillis, interval.toMillis, TimeUnit.MILLISECONDS) case _ => log.warn(s"Can not register metrics monitoring for execution context $ec") } private def setMetricsForThreadPoolExecutor(ecName: String, threadPoolExecutor: ThreadPoolExecutor): Unit = { val queueSize = threadPoolExecutor.getQueue.size log.trace(s"Queue size for ec $ecName of type ThreadPoolExecutor: $queueSize") metricsService.setECWaitingTasksNumber(ecName, ExecutionContextType.THREAD_POOL_EXECUTOR, queueSize) } private def setMetricsForForkJoinPool(ecName: String, forkJoinPool: ForkJoinPool): Unit = { val queueSize = forkJoinPool.getQueuedSubmissionCount log.trace(s"Queue size for ec $ecName of type ForkJoinPool: $queueSize") metricsService.setECWaitingTasksNumber(ecName, ExecutionContextType.FORK_JOIN_POOL, queueSize) } private def interval = config.ecMonitorInterval def close(): Unit = { scheduler.shutdown() } }