Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save raksja/cc035db7fe9849b038910323d7901cf7 to your computer and use it in GitHub Desktop.
Save raksja/cc035db7fe9849b038910323d7901cf7 to your computer and use it in GitHub Desktop.
Monitor Scala's ExecutionContext / Akka Dispatcher lag (number of tasks in waiting queues)
import java.util.concurrent.{ScheduledExecutorService, _}
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
class ExecutionContextMonitor()(implicit metricsService: MetricsClient) {
private val log = LoggerFactory.getLogger(this.getClass)
val scheduler = Executors.newSingleThreadScheduledExecutor()
val monitorInterval = 1.second
def monitor(ec: ExecutionContext, ecName: String): Unit = {
ec match {
case threadPoolExecutor: ThreadPoolExecutor =>
val runnable = new Runnable {
override def run(): Unit = {
val queueSize = threadPoolExecutor.getQueue.size
metricsService.setECWaitingTasksNumber(ecName, ExecutionContextType.THREAD_POOL_EXECUTOR, queueSize)
}
}
scheduler.scheduleAtFixedRate(runnable, monitorInterval.toMillis, monitorInterval.toMillis, TimeUnit.MILLISECONDS)
case forkJoinPool: ForkJoinPool =>
val runnable = new Runnable {
override def run(): Unit = {
val queueSize = forkJoinPool.getQueuedSubmissionCount
metricsService.setECWaitingTasksNumber(ecName, ExecutionContextType.FORK_JOIN_POOL, queueSize)
}
}
scheduler.scheduleAtFixedRate(runnable, monitorInterval.toMillis, monitorInterval.toMillis, TimeUnit.MILLISECONDS)
case _ => log.warn(s"Can not register metrics for execution context $ec")
}
}
def close(): Unit = {
scheduler.shutdown()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment