package aa.rx; import io.reactivex.Observable; import io.reactivex.ObservableTransformer; import io.reactivex.Scheduler; import java.util.List; import java.util.concurrent.TimeUnit; /** * Inspired */ public class BufferDebounceUtil { public static ObservableTransformer> bufferDebounce( long time, TimeUnit unit, Scheduler scheduler) { return rawUpstream -> rawUpstream.publish(upstream -> bufferWithBoundary(upstream, time, unit, scheduler)); } private static Observable> bufferWithBoundary(Observable upstream, long time, TimeUnit unit, Scheduler scheduler) { return upstream.buffer(boundary(upstream, time, unit, scheduler)); } private static Observable boundary(Observable upstream, long time, TimeUnit unit, Scheduler scheduler) { return upstream.debounce(time, unit, scheduler).takeUntil(complete(upstream)); } /** * The takeUntil is there to prevent the completion of o to trigger an empty buffer. */ private static Observable complete(Observable upstream) { return upstream.ignoreElements().toObservable(); } } package aa.rx; import com.google.common.collect.ImmutableList; import io.reactivex.ObservableTransformer; import io.reactivex.observers.TestObserver; import io.reactivex.schedulers.TestScheduler; import io.reactivex.subjects.PublishSubject; import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.Test; public class BufferDebounceUtilTest { @Test public void advanceTimeTo() { PublishSubject ps = PublishSubject.create(); TestScheduler testScheduler = new TestScheduler(); ObservableTransformer> composer = BufferDebounceUtil.bufferDebounce(200, TimeUnit.MILLISECONDS, testScheduler); TestObserver> testObserver = ps.compose(composer).test(); testObserver.assertNoErrors(); testObserver.assertNotComplete(); testObserver.assertNoValues(); ps.onNext(1); ps.onNext(2); testObserver.assertNoValues(); testScheduler.advanceTimeTo(100, TimeUnit.MILLISECONDS); testObserver.assertNoValues(); ps.onNext(3); testObserver.assertNoValues(); testScheduler.advanceTimeTo(150, TimeUnit.MILLISECONDS); testObserver.assertValueCount(0); ps.onNext(4); testObserver.assertValueCount(0); testScheduler.advanceTimeTo(400, TimeUnit.MILLISECONDS); testObserver.assertValueCount(1); testObserver.assertValue(ImmutableList.of(1, 2, 3, 4)); ps.onNext(5); testObserver.assertValueCount(1); testScheduler.advanceTimeTo(450, TimeUnit.MILLISECONDS); testObserver.assertValueCount(1); ps.onNext(6); testObserver.assertValueCount(1); testScheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS); testObserver.assertValueCount(2); testObserver.assertValueAt(1, ImmutableList.of(5, 6)); ps.onNext(7); ps.onComplete(); testObserver.assertValueCount(3); testObserver.assertValueAt(2, ImmutableList.of(7)); testScheduler.advanceTimeTo(850, TimeUnit.MILLISECONDS); testObserver.assertValueCount(3); } @Test public void advanceTimeBy() { PublishSubject ps = PublishSubject.create(); TestScheduler testScheduler = new TestScheduler(); ObservableTransformer> composer = BufferDebounceUtil.bufferDebounce(200, TimeUnit.MILLISECONDS, testScheduler); TestObserver> testObserver = ps.compose(composer).test(); testObserver.assertNoErrors(); testObserver.assertNotComplete(); testObserver.assertNoValues(); ps.onNext(1); ps.onNext(2); testObserver.assertNoValues(); testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); testObserver.assertNoValues(); ps.onNext(3); testObserver.assertNoValues(); testScheduler.advanceTimeBy(150, TimeUnit.MILLISECONDS); testObserver.assertNoValues(); testScheduler.advanceTimeBy(50, TimeUnit.MILLISECONDS); testObserver.assertValue(ImmutableList.of(1, 2, 3)); prolongTheTimeline(testScheduler); ps.onNext(4); testObserver.assertValueCount(1); testScheduler.advanceTimeBy(400, TimeUnit.MILLISECONDS); testObserver.assertValueCount(2); testObserver.assertValueAt(1, ImmutableList.of(4)); prolongTheTimeline(testScheduler); ps.onNext(5); testObserver.assertValueCount(2); testScheduler.advanceTimeBy(450, TimeUnit.MILLISECONDS); testObserver.assertValueCount(3); testObserver.assertValueAt(2, ImmutableList.of(5)); prolongTheTimeline(testScheduler); ps.onNext(6); testObserver.assertValueCount(3); testScheduler.advanceTimeBy(800, TimeUnit.MILLISECONDS); testObserver.assertValueCount(4); testObserver.assertValueAt(3, ImmutableList.of(6)); prolongTheTimeline(testScheduler); ps.onNext(7); ps.onComplete(); testObserver.assertValueCount(5); testObserver.assertValueAt(4, ImmutableList.of(7)); testScheduler.advanceTimeBy(850, TimeUnit.MILLISECONDS); testObserver.assertValueCount(5); } private static void prolongTheTimeline(TestScheduler testScheduler) { testScheduler.advanceTimeBy(1, TimeUnit.HOURS); } @Test public void advanceTimeByRealUseCase() { PublishSubject ps = PublishSubject.create(); TestScheduler testScheduler = new TestScheduler(); ObservableTransformer> composer = BufferDebounceUtil.bufferDebounce(200, TimeUnit.MILLISECONDS, testScheduler); TestObserver> testObserver = ps.compose(composer).test(); testObserver.assertNoErrors(); testObserver.assertNotComplete(); testObserver.assertNoValues(); testScheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS); testObserver.assertNoValues(); //iteration_1:first image ps.onNext(1); testScheduler.advanceTimeBy(80, TimeUnit.MILLISECONDS); //iteration_1:second image ps.onNext(2); testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); testObserver.assertNoValues(); //iteration_1:concluded testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); testObserver.assertValueCount(1); testObserver.assertValue(ImmutableList.of(1, 2)); //iteration_2:started testScheduler.advanceTimeBy(10000, TimeUnit.MILLISECONDS); //iteration_2:first image and only images ps.onNext(3); testScheduler.advanceTimeBy(150, TimeUnit.MILLISECONDS); testObserver.assertValueCount(1); testScheduler.advanceTimeBy(50, TimeUnit.MILLISECONDS); testObserver.assertValueCount(2); testObserver.assertValueAt(1, ImmutableList.of(3)); //iteration_3:started testScheduler.advanceTimeBy(10000, TimeUnit.MILLISECONDS); //iteration_3:first image ps.onNext(4); testScheduler.advanceTimeBy(199, TimeUnit.MILLISECONDS); ps.onNext(5); testScheduler.advanceTimeBy(199, TimeUnit.MILLISECONDS); ps.onNext(6); testScheduler.advanceTimeBy(199, TimeUnit.MILLISECONDS); testObserver.assertValueCount(2); testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); testObserver.assertValueCount(3); testObserver.assertValueAt(2, ImmutableList.of(4, 5, 6)); } }