Skip to content

Instantly share code, notes, and snippets.

@IgorBerman
Last active February 27, 2018 07:57
Show Gist options
  • Save IgorBerman/188614485e192d7d62df700f23dddf91 to your computer and use it in GitHub Desktop.
Save IgorBerman/188614485e192d7d62df700f23dddf91 to your computer and use it in GitHub Desktop.

Revisions

  1. IgorBerman revised this gist Jan 28, 2017. 1 changed file with 16 additions and 25 deletions.
    41 changes: 16 additions & 25 deletions StreamWithHttpPoolExample.java
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,4 @@
    package com.dynamicyield.example;
    package com.example;

    import java.util.concurrent.CompletionStage;
    import java.util.concurrent.TimeUnit;
    @@ -8,28 +8,21 @@
    import akka.Done;
    import akka.NotUsed;
    import akka.actor.ActorSystem;
    import akka.event.Logging;
    import akka.event.LoggingAdapter;
    import akka.http.javadsl.ConnectHttp;
    import akka.http.javadsl.HostConnectionPool;
    import akka.http.javadsl.Http;
    import akka.http.javadsl.marshallers.jackson.Jackson;
    import akka.http.javadsl.model.HttpRequest;
    import akka.http.javadsl.model.HttpResponse;
    import akka.http.javadsl.settings.ConnectionPoolSettings;
    import akka.japi.Pair;
    import akka.stream.ActorMaterializer;
    import akka.stream.ActorMaterializerSettings;
    import akka.stream.Materializer;
    import akka.stream.javadsl.Flow;
    import akka.stream.javadsl.Keep;
    import akka.stream.javadsl.RunnableGraph;
    import akka.stream.javadsl.Sink;
    import akka.stream.javadsl.Source;

    import com.typesafe.config.Config;
    import com.typesafe.config.ConfigFactory;

    public class StreamWithHttpPoolExample {

    //see https://jsonplaceholder.typicode.com/posts/1
    @@ -71,62 +64,60 @@ public String toString() {


    public static void main(String[] args) throws Exception {
    Config config = ConfigFactory.parseString("akka { loglevel = \"DEBUG\" }").withFallback(ConfigFactory.load("QuickStart"));
    final ActorSystem system = ActorSystem.create("QuickStart", config);
    final ActorSystem system = ActorSystem.create();

    final Materializer mat = ActorMaterializer.create(ActorMaterializerSettings.create(system).withDebugLogging(true), system);
    final Materializer mat = ActorMaterializer.create(system);

    //take some source
    Source<Integer,NotUsed> source = Source.range(1, 10);
    Source<Integer,NotUsed> source = Source.range(1, 10).named("source");

    final String externalResource = "https://jsonplaceholder.typicode.com/";
    final String externalResource = "http://jsonplaceholder.typicode.com/";
    //map source to requests to external resource
    Source<Pair<HttpRequest, Integer>, NotUsed> requestsStream = source.map(y -> {
    System.out.println(Thread.currentThread() + " creating reqest for " + y);
    system.log().info("Creating reqest for " + y);
    HttpRequest req = HttpRequest.create(externalResource + "posts/"+y);
    return Pair.create(req, y);//we create Pair here due to API of cached host connection pool, see below!
    });
    }).named("requests");

    //create http pool as flow
    final Http http = Http.get(system);
    final ConnectHttp connectHttp = ConnectHttp.toHost(externalResource);
    final ConnectionPoolSettings settings = ConnectionPoolSettings.create(system).withMaxConnections(4);
    final LoggingAdapter log = Logging.getLogger(system, "akka");
    Flow<Pair<HttpRequest, Integer>, Pair<Try<HttpResponse>, Integer>, HostConnectionPool> cachedHostConnectionPool
    = http.cachedHostConnectionPool(connectHttp, settings, log, mat);
    = http.cachedHostConnectionPool(connectHttp, mat);

    //map requests to maybe responses
    Source<Pair<Try<HttpResponse>, Integer>, NotUsed> maybeResponsesStream = requestsStream.via(cachedHostConnectionPool);
    Source<Pair<Try<HttpResponse>, Integer>, NotUsed> maybeResponsesStream = requestsStream.via(cachedHostConnectionPool).named("mayberesps");

    //extract from response body of successfull requests body with json and map it to Post object
    Source<Post, NotUsed> postsStream = maybeResponsesStream.mapAsync(4, x -> {
    System.out.println(Thread.currentThread() + " here is " + x);
    system.log().info("MapAsync" + x);
    Try<HttpResponse> maybeResponse = x.first();
    if (maybeResponse.isFailure()) {
    throw new RuntimeException(x.first().failed().get());
    } else {
    HttpResponse rsp = x.first().get();
    return Jackson.unmarshaller(Post.class).unmarshal(rsp.entity(), system.dispatcher(), mat);
    }
    });
    }).named("posts");

    Sink<Post, CompletionStage<Done>> consoleSink = Sink.foreach(x -> System.out.println(Thread.currentThread() + " out:" + x));
    Sink<Post, CompletionStage<Done>> consoleSink = Sink.foreach(x -> system.log().info("Out:" + x));

    RunnableGraph<Pair<NotUsed,CompletionStage<Done>>> graph = postsStream.toMat(consoleSink, Keep.both());

    Pair<NotUsed, CompletionStage<Done>> notUsedWithCompletionHook = graph.run(mat);


    //Never completes..
    notUsedWithCompletionHook.second().whenComplete((x,t) -> {
    System.out.println("We are terminating everything " + t);
    try {
    system.log().warning("Stream completed" + (t != null ? " with exception " + t + ", " + t.getCause() : ""));
    try {
    CompletionStage<BoxedUnit> shutdownAllConnectionPools = http.shutdownAllConnectionPools();
    shutdownAllConnectionPools.toCompletableFuture().get(1000, TimeUnit.MILLISECONDS);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }finally {
    system.terminate();
    }
    });
    }
    }
  2. IgorBerman revised this gist Jan 28, 2017. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion StreamWithHttpPoolExample.java
    Original file line number Diff line number Diff line change
    @@ -119,7 +119,7 @@ public static void main(String[] args) throws Exception {

    //Never completes..
    notUsedWithCompletionHook.second().whenComplete((x,t) -> {
    System.out.println("Encountered Exception that stops stream, we are terminating everything " + t + " " + t.getCause());
    System.out.println("We are terminating everything " + t);
    try {
    CompletionStage<BoxedUnit> shutdownAllConnectionPools = http.shutdownAllConnectionPools();
    shutdownAllConnectionPools.toCompletableFuture().get(1000, TimeUnit.MILLISECONDS);
  3. IgorBerman created this gist Jan 27, 2017.
    132 changes: 132 additions & 0 deletions StreamWithHttpPoolExample.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,132 @@
    package com.dynamicyield.example;

    import java.util.concurrent.CompletionStage;
    import java.util.concurrent.TimeUnit;

    import scala.runtime.BoxedUnit;
    import scala.util.Try;
    import akka.Done;
    import akka.NotUsed;
    import akka.actor.ActorSystem;
    import akka.event.Logging;
    import akka.event.LoggingAdapter;
    import akka.http.javadsl.ConnectHttp;
    import akka.http.javadsl.HostConnectionPool;
    import akka.http.javadsl.Http;
    import akka.http.javadsl.marshallers.jackson.Jackson;
    import akka.http.javadsl.model.HttpRequest;
    import akka.http.javadsl.model.HttpResponse;
    import akka.http.javadsl.settings.ConnectionPoolSettings;
    import akka.japi.Pair;
    import akka.stream.ActorMaterializer;
    import akka.stream.ActorMaterializerSettings;
    import akka.stream.Materializer;
    import akka.stream.javadsl.Flow;
    import akka.stream.javadsl.Keep;
    import akka.stream.javadsl.RunnableGraph;
    import akka.stream.javadsl.Sink;
    import akka.stream.javadsl.Source;

    import com.typesafe.config.Config;
    import com.typesafe.config.ConfigFactory;

    public class StreamWithHttpPoolExample {

    //see https://jsonplaceholder.typicode.com/posts/1
    @SuppressWarnings("unused")
    private static class Post {
    private int userId;
    private int id;
    private String title;
    private String body;
    public int getUserId() {
    return userId;
    }
    public void setUserId(int userId) {
    this.userId = userId;
    }
    public int getId() {
    return id;
    }
    public void setId(int id) {
    this.id = id;
    }
    public String getTitle() {
    return title;
    }
    public void setTitle(String title) {
    this.title = title;
    }
    public String getBody() {
    return body;
    }
    public void setBody(String body) {
    this.body = body;
    }
    @Override
    public String toString() {
    return "Post [userId=" + userId + ", id=" + id + ", title=" + title + "]";
    }
    }


    public static void main(String[] args) throws Exception {
    Config config = ConfigFactory.parseString("akka { loglevel = \"DEBUG\" }").withFallback(ConfigFactory.load("QuickStart"));
    final ActorSystem system = ActorSystem.create("QuickStart", config);

    final Materializer mat = ActorMaterializer.create(ActorMaterializerSettings.create(system).withDebugLogging(true), system);

    //take some source
    Source<Integer,NotUsed> source = Source.range(1, 10);

    final String externalResource = "https://jsonplaceholder.typicode.com/";
    //map source to requests to external resource
    Source<Pair<HttpRequest, Integer>, NotUsed> requestsStream = source.map(y -> {
    System.out.println(Thread.currentThread() + " creating reqest for " + y);
    HttpRequest req = HttpRequest.create(externalResource + "posts/"+y);
    return Pair.create(req, y);//we create Pair here due to API of cached host connection pool, see below!
    });

    //create http pool as flow
    final Http http = Http.get(system);
    final ConnectHttp connectHttp = ConnectHttp.toHost(externalResource);
    final ConnectionPoolSettings settings = ConnectionPoolSettings.create(system).withMaxConnections(4);
    final LoggingAdapter log = Logging.getLogger(system, "akka");
    Flow<Pair<HttpRequest, Integer>, Pair<Try<HttpResponse>, Integer>, HostConnectionPool> cachedHostConnectionPool
    = http.cachedHostConnectionPool(connectHttp, settings, log, mat);

    //map requests to maybe responses
    Source<Pair<Try<HttpResponse>, Integer>, NotUsed> maybeResponsesStream = requestsStream.via(cachedHostConnectionPool);

    //extract from response body of successfull requests body with json and map it to Post object
    Source<Post, NotUsed> postsStream = maybeResponsesStream.mapAsync(4, x -> {
    System.out.println(Thread.currentThread() + " here is " + x);
    Try<HttpResponse> maybeResponse = x.first();
    if (maybeResponse.isFailure()) {
    throw new RuntimeException(x.first().failed().get());
    } else {
    HttpResponse rsp = x.first().get();
    return Jackson.unmarshaller(Post.class).unmarshal(rsp.entity(), system.dispatcher(), mat);
    }
    });

    Sink<Post, CompletionStage<Done>> consoleSink = Sink.foreach(x -> System.out.println(Thread.currentThread() + " out:" + x));

    RunnableGraph<Pair<NotUsed,CompletionStage<Done>>> graph = postsStream.toMat(consoleSink, Keep.both());

    Pair<NotUsed, CompletionStage<Done>> notUsedWithCompletionHook = graph.run(mat);


    //Never completes..
    notUsedWithCompletionHook.second().whenComplete((x,t) -> {
    System.out.println("Encountered Exception that stops stream, we are terminating everything " + t + " " + t.getCause());
    try {
    CompletionStage<BoxedUnit> shutdownAllConnectionPools = http.shutdownAllConnectionPools();
    shutdownAllConnectionPools.toCompletableFuture().get(1000, TimeUnit.MILLISECONDS);
    } catch (Exception e) {
    e.printStackTrace();
    }
    system.terminate();
    });
    }
    }