Last active
February 27, 2018 07:57
-
-
Save IgorBerman/188614485e192d7d62df700f23dddf91 to your computer and use it in GitHub Desktop.
Revisions
-
IgorBerman revised this gist
Jan 28, 2017 . 1 changed file with 16 additions and 25 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,4 +1,4 @@ package com.example; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -8,28 +8,21 @@ import akka.Done; import akka.NotUsed; import akka.actor.ActorSystem; import akka.http.javadsl.ConnectHttp; import akka.http.javadsl.HostConnectionPool; import akka.http.javadsl.Http; import akka.http.javadsl.marshallers.jackson.Jackson; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.japi.Pair; import akka.stream.ActorMaterializer; import akka.stream.Materializer; import akka.stream.javadsl.Flow; import akka.stream.javadsl.Keep; import akka.stream.javadsl.RunnableGraph; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; public class StreamWithHttpPoolExample { //see https://jsonplaceholder.typicode.com/posts/1 @@ -71,62 +64,60 @@ public String toString() { public static void main(String[] args) throws Exception { final ActorSystem system = ActorSystem.create(); final Materializer mat = ActorMaterializer.create(system); //take some source Source<Integer,NotUsed> source = Source.range(1, 10).named("source"); final String externalResource = "http://jsonplaceholder.typicode.com/"; //map source to requests to external resource Source<Pair<HttpRequest, Integer>, NotUsed> requestsStream = source.map(y -> { system.log().info("Creating reqest for " + y); HttpRequest req = HttpRequest.create(externalResource + "posts/"+y); return Pair.create(req, y);//we create Pair here due to API of cached host connection pool, see below! }).named("requests"); //create http pool as flow final Http http = Http.get(system); final ConnectHttp connectHttp = ConnectHttp.toHost(externalResource); Flow<Pair<HttpRequest, Integer>, Pair<Try<HttpResponse>, Integer>, HostConnectionPool> cachedHostConnectionPool = http.cachedHostConnectionPool(connectHttp, mat); //map requests to maybe responses Source<Pair<Try<HttpResponse>, Integer>, NotUsed> maybeResponsesStream = requestsStream.via(cachedHostConnectionPool).named("mayberesps"); //extract from response body of successfull requests body with json and map it to Post object Source<Post, NotUsed> postsStream = maybeResponsesStream.mapAsync(4, x -> { system.log().info("MapAsync" + x); Try<HttpResponse> maybeResponse = x.first(); if (maybeResponse.isFailure()) { throw new RuntimeException(x.first().failed().get()); } else { HttpResponse rsp = x.first().get(); return Jackson.unmarshaller(Post.class).unmarshal(rsp.entity(), system.dispatcher(), mat); } }).named("posts"); Sink<Post, CompletionStage<Done>> consoleSink = Sink.foreach(x -> system.log().info("Out:" + x)); RunnableGraph<Pair<NotUsed,CompletionStage<Done>>> graph = postsStream.toMat(consoleSink, Keep.both()); Pair<NotUsed, CompletionStage<Done>> notUsedWithCompletionHook = graph.run(mat); notUsedWithCompletionHook.second().whenComplete((x,t) -> { try { system.log().warning("Stream completed" + (t != null ? " with exception " + t + ", " + t.getCause() : "")); try { CompletionStage<BoxedUnit> shutdownAllConnectionPools = http.shutdownAllConnectionPools(); shutdownAllConnectionPools.toCompletableFuture().get(1000, TimeUnit.MILLISECONDS); } catch (Exception e) { e.printStackTrace(); } }finally { system.terminate(); } }); } } -
IgorBerman revised this gist
Jan 28, 2017 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -119,7 +119,7 @@ public static void main(String[] args) throws Exception { //Never completes.. notUsedWithCompletionHook.second().whenComplete((x,t) -> { System.out.println("We are terminating everything " + t); try { CompletionStage<BoxedUnit> shutdownAllConnectionPools = http.shutdownAllConnectionPools(); shutdownAllConnectionPools.toCompletableFuture().get(1000, TimeUnit.MILLISECONDS); -
IgorBerman created this gist
Jan 27, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,132 @@ package com.dynamicyield.example; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import scala.runtime.BoxedUnit; import scala.util.Try; import akka.Done; import akka.NotUsed; import akka.actor.ActorSystem; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.http.javadsl.ConnectHttp; import akka.http.javadsl.HostConnectionPool; import akka.http.javadsl.Http; import akka.http.javadsl.marshallers.jackson.Jackson; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.http.javadsl.settings.ConnectionPoolSettings; import akka.japi.Pair; import akka.stream.ActorMaterializer; import akka.stream.ActorMaterializerSettings; import akka.stream.Materializer; import akka.stream.javadsl.Flow; import akka.stream.javadsl.Keep; import akka.stream.javadsl.RunnableGraph; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; public class StreamWithHttpPoolExample { //see https://jsonplaceholder.typicode.com/posts/1 @SuppressWarnings("unused") private static class Post { private int userId; private int id; private String title; private String body; public int getUserId() { return userId; } public void setUserId(int userId) { this.userId = userId; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } @Override public String toString() { return "Post [userId=" + userId + ", id=" + id + ", title=" + title + "]"; } } public static void main(String[] args) throws Exception { Config config = ConfigFactory.parseString("akka { loglevel = \"DEBUG\" }").withFallback(ConfigFactory.load("QuickStart")); final ActorSystem system = ActorSystem.create("QuickStart", config); final Materializer mat = ActorMaterializer.create(ActorMaterializerSettings.create(system).withDebugLogging(true), system); //take some source Source<Integer,NotUsed> source = Source.range(1, 10); final String externalResource = "https://jsonplaceholder.typicode.com/"; //map source to requests to external resource Source<Pair<HttpRequest, Integer>, NotUsed> requestsStream = source.map(y -> { System.out.println(Thread.currentThread() + " creating reqest for " + y); HttpRequest req = HttpRequest.create(externalResource + "posts/"+y); return Pair.create(req, y);//we create Pair here due to API of cached host connection pool, see below! }); //create http pool as flow final Http http = Http.get(system); final ConnectHttp connectHttp = ConnectHttp.toHost(externalResource); final ConnectionPoolSettings settings = ConnectionPoolSettings.create(system).withMaxConnections(4); final LoggingAdapter log = Logging.getLogger(system, "akka"); Flow<Pair<HttpRequest, Integer>, Pair<Try<HttpResponse>, Integer>, HostConnectionPool> cachedHostConnectionPool = http.cachedHostConnectionPool(connectHttp, settings, log, mat); //map requests to maybe responses Source<Pair<Try<HttpResponse>, Integer>, NotUsed> maybeResponsesStream = requestsStream.via(cachedHostConnectionPool); //extract from response body of successfull requests body with json and map it to Post object Source<Post, NotUsed> postsStream = maybeResponsesStream.mapAsync(4, x -> { System.out.println(Thread.currentThread() + " here is " + x); Try<HttpResponse> maybeResponse = x.first(); if (maybeResponse.isFailure()) { throw new RuntimeException(x.first().failed().get()); } else { HttpResponse rsp = x.first().get(); return Jackson.unmarshaller(Post.class).unmarshal(rsp.entity(), system.dispatcher(), mat); } }); Sink<Post, CompletionStage<Done>> consoleSink = Sink.foreach(x -> System.out.println(Thread.currentThread() + " out:" + x)); RunnableGraph<Pair<NotUsed,CompletionStage<Done>>> graph = postsStream.toMat(consoleSink, Keep.both()); Pair<NotUsed, CompletionStage<Done>> notUsedWithCompletionHook = graph.run(mat); //Never completes.. notUsedWithCompletionHook.second().whenComplete((x,t) -> { System.out.println("Encountered Exception that stops stream, we are terminating everything " + t + " " + t.getCause()); try { CompletionStage<BoxedUnit> shutdownAllConnectionPools = http.shutdownAllConnectionPools(); shutdownAllConnectionPools.toCompletableFuture().get(1000, TimeUnit.MILLISECONDS); } catch (Exception e) { e.printStackTrace(); } system.terminate(); }); } }