Last active
November 15, 2018 17:28
-
-
Save akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08 to your computer and use it in GitHub Desktop.
Revisions
-
akarnokd revised this gist
May 1, 2016 . 1 changed file with 4 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -69,8 +69,8 @@ public ValveSubscriber(Subscriber<? super T> actual, int prefetch, boolean defau this.error = new AtomicReference<>(); this.queue = new SpscAtomicArrayQueue<>(prefetch); this.nl = NotificationLite.instance(); this.valveOpen = defaultState; request(prefetch); } @Override @@ -175,6 +175,8 @@ void drain() { break; } actual.onNext(nl.getValue(o)); e++; if (e == limit) { r = BackpressureUtils.produced(requested, e); @@ -241,4 +243,4 @@ public void onCompleted() { valve.otherCompleted(); } } } -
akarnokd revised this gist
Apr 28, 2016 . 1 changed file with 5 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -30,6 +30,7 @@ public Subscriber<? super T> call(Subscriber<? super T> child) { parent.other = os; child.add(parent); child.add(os); child.setProducer(r -> parent.requestInner(r)); other.subscribe(os); @@ -153,6 +154,7 @@ void drain() { if (error.get() != null) { unsubscribe(); other.unsubscribe(); queue.clear(); Throwable ex = ExceptionsUtils.terminate(error); actual.onError(ex); @@ -164,6 +166,7 @@ void drain() { boolean empty = o == null; if (d && empty) { other.unsubscribe(); actual.onCompleted(); return; } @@ -187,13 +190,15 @@ void drain() { if (error.get() != null) { unsubscribe(); other.unsubscribe(); queue.clear(); Throwable ex = ExceptionsUtils.terminate(error); actual.onError(ex); return; } if (done && queue.isEmpty()) { other.unsubscribe(); actual.onCompleted(); return; } -
akarnokd created this gist
Apr 28, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,239 @@ import java.util.Queue; import java.util.concurrent.atomic.*; import rx.*; import rx.Observable.Operator; import rx.exceptions.MissingBackpressureException; import rx.internal.operators.*; import rx.internal.util.*; import rx.internal.util.atomic.SpscAtomicArrayQueue; public final class RxValve<T> implements Operator<T, T> { final Observable<Boolean> other; final int prefetch; final boolean defaultState; public RxValve(Observable<Boolean> other, int prefetch, boolean defaultState) { this.other = other; this.prefetch = prefetch; this.defaultState = defaultState; } @Override public Subscriber<? super T> call(Subscriber<? super T> child) { ValveSubscriber<T> parent = new ValveSubscriber<>(child, prefetch, defaultState); OtherSubscriber os = new OtherSubscriber(parent); parent.other = os; child.add(parent); child.setProducer(r -> parent.requestInner(r)); other.subscribe(os); return parent; } static final class ValveSubscriber<T> extends Subscriber<T> { final Subscriber<? super T> actual; final int limit; final AtomicLong requested; final AtomicInteger wip; final AtomicReference<Throwable> error; final Queue<Object> queue; final NotificationLite<T> nl; volatile boolean valveOpen; volatile boolean done; Subscription other; long emission; public ValveSubscriber(Subscriber<? super T> actual, int prefetch, boolean defaultState) { this.actual = actual; this.limit = prefetch - (prefetch >> 2); this.requested = new AtomicLong(); this.wip = new AtomicInteger(); this.error = new AtomicReference<>(); this.queue = new SpscAtomicArrayQueue<>(prefetch); this.nl = NotificationLite.instance(); request(prefetch); this.valveOpen = defaultState; } @Override public void onNext(T t) { if (!queue.offer(nl.next(t))) { onError(new MissingBackpressureException()); } else { drain(); } } @Override public void onError(Throwable e) { if (ExceptionsUtils.addThrowable(error, e)) { other.unsubscribe(); done = true; drain(); } else { RxJavaPluginUtils.handleException(e); } } @Override public void onCompleted() { done = true; drain(); } void otherSignal(boolean state) { valveOpen = state; if (state) { drain(); } } void otherError(Throwable e) { if (ExceptionsUtils.addThrowable(error, e)) { unsubscribe(); done = true; drain(); } else { RxJavaPluginUtils.handleException(e); } } void otherCompleted() { if (ExceptionsUtils.addThrowable(error, new IllegalStateException("Other completed unexpectedly"))) { unsubscribe(); done = true; drain(); } } void requestInner(long n) { if (n > 0) { BackpressureUtils.getAndAddRequest(requested, n); drain(); } else if (n < 0) { throw new IllegalArgumentException("n >= 0 required but it was " + n); } } void drain() { if (wip.getAndIncrement() != 0) { return; } int missed = 1; for (;;) { if (valveOpen) { long r = requested.get(); long e = emission; while (e != r && valveOpen) { if (actual.isUnsubscribed()) { return; } if (error.get() != null) { unsubscribe(); queue.clear(); Throwable ex = ExceptionsUtils.terminate(error); actual.onError(ex); return; } boolean d = done; Object o = queue.poll(); boolean empty = o == null; if (d && empty) { actual.onCompleted(); return; } if (empty) { break; } e++; if (e == limit) { r = BackpressureUtils.produced(requested, e); request(e); e = 0L; } } if (e == r) { if (actual.isUnsubscribed()) { return; } if (error.get() != null) { unsubscribe(); queue.clear(); Throwable ex = ExceptionsUtils.terminate(error); actual.onError(ex); return; } if (done && queue.isEmpty()) { actual.onCompleted(); return; } } emission = e; } else { if (actual.isUnsubscribed()) { return; } } missed = wip.addAndGet(-missed); if (missed == 0) { break; } } } } static final class OtherSubscriber extends Subscriber<Boolean> { final ValveSubscriber<?> valve; public OtherSubscriber(ValveSubscriber<?> valve) { this.valve = valve; } @Override public void onNext(Boolean t) { valve.otherSignal(t); } @Override public void onError(Throwable e) { valve.otherError(e); } @Override public void onCompleted() { valve.otherCompleted(); } } }