Skip to content

Instantly share code, notes, and snippets.

@vitamon
Created July 29, 2014 13:50
Show Gist options
  • Save vitamon/0c508dea00b2be51d898 to your computer and use it in GitHub Desktop.
Save vitamon/0c508dea00b2be51d898 to your computer and use it in GitHub Desktop.

Revisions

  1. vitamon created this gist Jul 29, 2014.
    62 changes: 62 additions & 0 deletions TimeoutFuture for Akka
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,62 @@
    import akka.actor.Cancellable;
    import akka.actor.Scheduler;
    import scala.concurrent.ExecutionContext;
    import scala.concurrent.Future;
    import scala.concurrent.Promise;
    import scala.concurrent.duration.FiniteDuration;

    import java.util.concurrent.Callable;
    import java.util.concurrent.TimeoutException;
    import java.util.concurrent.atomic.AtomicReference;

    import static akka.dispatch.Futures.future;
    import static akka.dispatch.Futures.promise;

    public class TimeoutFuture
    {
    public static <A> Future<A> apply(final Callable<A> block, FiniteDuration timeout, ExecutionContext ctx, Scheduler scheduler)
    {
    final Promise<A> promise = promise();
    final AtomicReference<Thread> threadAtomicReference = new AtomicReference<>();

    // timeout logic
    final Cancellable cancellable = scheduler.scheduleOnce(timeout, new Runnable()
    {
    @Override
    public void run()
    {
    // kill the thread if the future was not completed
    if (promise.tryFailure(new TimeoutException()))
    {
    Thread thread = threadAtomicReference.getAndSet(null);
    if (thread != null)
    {
    thread.interrupt();
    }
    }
    }
    }, ctx);

    // call business logic
    Future<A> f = future(new Callable<A>()
    {
    @Override
    public A call() throws Exception
    {
    try
    {
    threadAtomicReference.set(Thread.currentThread());
    return block.call();
    }
    finally
    {
    cancellable.cancel();
    }
    }
    }, ctx);

    promise.tryCompleteWith(f);

    return promise.future();
    }
    }