package com.company; import retrofit.RestAdapter; import retrofit.http.GET; import retrofit.http.Path; import rx.Observable; import rx.Subscriber; import rx.functions.Action1; import rx.functions.Func1; import rx.functions.Func2; import rx.observables.GroupedObservable; import java.util.List; class Repo { public String full_name; } class Contributor { public String login; } interface GitHub { @GET("/repos/{owner}/{repo}/contributors") Observable> contributors(@Path("owner") String owner, @Path("repo") String repo); @GET("/users/{user}/starred") Observable> starred(@Path("user") String user); } public class Main { public static void main(String[] args) { RestAdapter restAdapter = new RestAdapter.Builder() .setEndpoint("https://api.github.com") .setLogLevel(RestAdapter.LogLevel.FULL) .setLog(new RestAdapter.Log() { @Override public void log(String message) { System.out.println(message); } }) .build(); final GitHub github = restAdapter.create(GitHub.class); github.contributors("square", "retrofit") .lift(Main.flattenList()) .flatMap(new Func1>>() { @Override public Observable> call(Contributor contributor) { return github.starred(contributor.login); } }) .lift(Main.flattenList()) .filter(new Func1() { @Override public Boolean call(Repo repo) { return !repo.full_name.startsWith("square/"); } }) .groupBy(new Func1() { @Override public String call(Repo repo) { return repo.full_name; } }) .flatMap(new Func1, Observable>() { @Override public Observable call(final GroupedObservable g) { return g.count().map(new Func1() { @Override public String call(Integer c) { return c + "\t" + g.getKey(); } }); } }) .toSortedList(new Func2() { @Override public Integer call(String a, String b) { return b.compareTo(a); } }) .lift(Main.flattenList()) .take(8) .forEach(new Action1() { @Override public void call(String line) { println(line); } }); } private static void println(String line) { System.out.println(line); } private static Observable.Operator> flattenList() { return new Observable.Operator>() { @Override public Subscriber> call(final Subscriber subscriber) { return new Subscriber>() { @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } @Override public void onNext(List contributors) { for (T c: contributors) subscriber.onNext(c); } }; } }; } }