package org.paulbetts.shroom.core; import android.os.AsyncTask; import com.squareup.okhttp.*; import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.util.Arrays; import rx.Observable; import rx.Subscriber; import rx.subscriptions.Subscriptions; import rx.functions.*; public class RxOkHttp { public static Observable request(OkHttpClient client, Request request) { return Observable.create((Subscriber subj) -> { final Call call = client.newCall(request); subj.add(Subscriptions.create(call::cancel)); call.enqueue(new Callback() { @Override public void onFailure(Request request, IOException e) { subj.onError(e); } @Override public void onResponse(Response response) throws IOException { Throwable error = getFailureExceptionOnBadStatus(response); if (error != null) { subj.onError(error); return; } subj.onNext(response); subj.onCompleted(); } }); }); } public static Observable streamBytes(OkHttpClient client, Request request) { return request(client, request) .flatMap(response -> Observable.create((Subscriber subj) -> { AsyncTask t = new AsyncTask() { @Override protected Void doInBackground(Object[] objects) { InputStream stream; byte[] buffer = new byte[65536]; int bytesRead = 0; stream = response.body().byteStream(); try { while (bytesRead > -1 && !subj.isUnsubscribed()) { bytesRead = stream.read(buffer, 0, 65536); if (bytesRead < 1) continue; subj.onNext(Arrays.copyOfRange(buffer, 0, bytesRead)); } if (!subj.isUnsubscribed()) subj.onCompleted(); stream.close(); } catch (IOException ex) { subj.onError(ex); } return null; } }; subj.add(Subscriptions.create(() -> t.cancel(false))); t.execute(); })); } public static Observable streamStrings(OkHttpClient client, Request request) { return streamBytes(client, request) .map(bytes -> { try { return new String(bytes, "UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException("UTF8 isn't supported this will never happen"); } }); } public static Observable streamLines(OkHttpClient client, Request request) { return streamStrings(client, request) .concatWith(Observable.just("\n")) .flatMap(new Func1>() { String remainingString = ""; @Override public Observable call(String s) { String[] lines = (remainingString + s).split("\n"); if (s.charAt(s.length() - 1) != '\n') { remainingString = lines[lines.length - 1]; return Observable.from(Arrays.copyOfRange(lines, 0, lines.length - 1)); } remainingString = ""; return Observable.from(lines); } }) .filter(x -> x.length() > 0); } private static Throwable getFailureExceptionOnBadStatus(Response resp) { if (resp.code() < 399) return null; return new FailedResponseException(resp); } }