package com.example; import java.util.concurrent.CompletionStage; import scala.concurrent.Await; import scala.concurrent.duration.Duration; import akka.Done; import akka.NotUsed; import akka.actor.ActorSystem; import akka.japi.Pair; import akka.japi.function.Function; import akka.stream.ActorMaterializer; import akka.stream.ActorMaterializerSettings; import akka.stream.FlowShape; import akka.stream.Graph; import akka.stream.KillSwitches; import akka.stream.Materializer; import akka.stream.Supervision; import akka.stream.UniqueKillSwitch; import akka.stream.Supervision.Directive; import akka.stream.javadsl.Flow; import akka.stream.javadsl.Keep; import akka.stream.javadsl.RunnableGraph; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; public class ExceptionHandlingExample { public static void main(String[] args) throws Exception { Config config = ConfigFactory.parseString("akka { loglevel = \"DEBUG\" }").withFallback(ConfigFactory.load("QuickStart")); final ActorSystem system = ActorSystem.create("QuickStart", config); final Function decider = exc -> { System.out.println(exc); if (exc instanceof ArithmeticException) { return Supervision.resume(); } else if (exc instanceof Exception) { return Supervision.resume(); } return Supervision.stop(); }; final Materializer mat = ActorMaterializer.create(ActorMaterializerSettings.create(system).withDebugLogging(true).withSupervisionStrategy(decider), system); Graph, UniqueKillSwitch> killSwitch = KillSwitches.single(); //take some source Source source = Source.range(1, 100); //attach ability to stop it with kill switch Source sourceWithKillSwitch = source.viaMat(killSwitch, Keep.right()); //this is custom logic that might throw different kinds of errors Flow map = Flow.create().map(x->{ Thread.sleep(1000);//only for testing if (x == 99) { throw new java.lang.Error("Got:"+x);//error } if (x % 7 == 0) { throw new java.lang.ArithmeticException("Got:"+x);//runtime } if (x % 11 == 0) { throw new java.io.IOException("Got:"+x);//checked } return x; }) //map only those that "ok" to string .map(x -> (x * 2)+"\n"); Source sourceAfterMapping = sourceWithKillSwitch.via(map); Sink> consoleSink = Sink.foreach(System.out::print); //create graph RunnableGraph>> graph = sourceAfterMapping.toMat(consoleSink, Keep.both()); //and run it when we got materialization hooks for killing and for listening for completion Pair> switchWithCompletion = graph.run(mat); //"enrich" completion handler with propogated failures(see decider above !ArithmeticException and !Exception) switchWithCompletion.second().exceptionally(new java.util.function.Function() { @Override public Done apply(Throwable t) { System.out.println("Encountered Exception that stops stream, we are terminating everything " + t + " " + t.getCause()); system.terminate(); return null; } }); //shutdown hook for CtrlC or service restarts(i.e. graceful shutdown) Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { System.out.println("Got Cntrl-C, shutting down by kill switch"); switchWithCompletion.first().shutdown(); } })); //after all setup is done, let's wait for the system to be terminated Await.ready(system.whenTerminated(), Duration.Inf()); } }