Skip to content

Instantly share code, notes, and snippets.

@TonyTangAndroid
Created September 9, 2023 05:45
Show Gist options
  • Save TonyTangAndroid/0f7253f6cc80cdaa88cc1f97fdeb7359 to your computer and use it in GitHub Desktop.
Save TonyTangAndroid/0f7253f6cc80cdaa88cc1f97fdeb7359 to your computer and use it in GitHub Desktop.

Revisions

  1. TonyTangAndroid created this gist Sep 9, 2023.
    220 changes: 220 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,220 @@
    package aa.rx;

    import io.reactivex.Observable;
    import io.reactivex.ObservableTransformer;
    import io.reactivex.Scheduler;
    import java.util.List;
    import java.util.concurrent.TimeUnit;

    /**
    * <a href="https://stackoverflow.com/a/49866518/4068957">Inspired</a>
    */
    public class BufferDebounceUtil {

    public static <T> ObservableTransformer<T, List<T>> bufferDebounce(
    long time, TimeUnit unit, Scheduler scheduler) {
    return rawUpstream -> rawUpstream.publish(upstream -> bufferWithBoundary(upstream, time, unit, scheduler));
    }

    private static <T> Observable<List<T>> bufferWithBoundary(Observable<T> upstream, long time,
    TimeUnit unit,
    Scheduler scheduler) {
    return upstream.buffer(boundary(upstream, time, unit, scheduler));
    }

    private static <T> Observable<T> boundary(Observable<T> upstream, long time, TimeUnit unit,
    Scheduler scheduler) {
    return upstream.debounce(time, unit, scheduler).takeUntil(complete(upstream));
    }

    /**
    * The takeUntil is there to prevent the completion of o to trigger an empty buffer.
    */
    private static <T> Observable<Object> complete(Observable<T> upstream) {
    return upstream.ignoreElements().toObservable();
    }

    }


    package aa.rx;

    import com.google.common.collect.ImmutableList;
    import io.reactivex.ObservableTransformer;
    import io.reactivex.observers.TestObserver;
    import io.reactivex.schedulers.TestScheduler;
    import io.reactivex.subjects.PublishSubject;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    import org.junit.Test;

    public class BufferDebounceUtilTest {

    @Test
    public void advanceTimeTo() {
    PublishSubject<Integer> ps = PublishSubject.create();

    TestScheduler testScheduler = new TestScheduler();
    ObservableTransformer<Integer, List<Integer>> composer = BufferDebounceUtil.bufferDebounce(200,
    TimeUnit.MILLISECONDS, testScheduler);
    TestObserver<List<Integer>> testObserver = ps.compose(composer).test();
    testObserver.assertNoErrors();
    testObserver.assertNotComplete();
    testObserver.assertNoValues();

    ps.onNext(1);
    ps.onNext(2);
    testObserver.assertNoValues();
    testScheduler.advanceTimeTo(100, TimeUnit.MILLISECONDS);
    testObserver.assertNoValues();
    ps.onNext(3);
    testObserver.assertNoValues();
    testScheduler.advanceTimeTo(150, TimeUnit.MILLISECONDS);
    testObserver.assertValueCount(0);

    ps.onNext(4);
    testObserver.assertValueCount(0);
    testScheduler.advanceTimeTo(400, TimeUnit.MILLISECONDS);
    testObserver.assertValueCount(1);
    testObserver.assertValue(ImmutableList.of(1, 2, 3, 4));
    ps.onNext(5);
    testObserver.assertValueCount(1);
    testScheduler.advanceTimeTo(450, TimeUnit.MILLISECONDS);
    testObserver.assertValueCount(1);

    ps.onNext(6);
    testObserver.assertValueCount(1);
    testScheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS);
    testObserver.assertValueCount(2);
    testObserver.assertValueAt(1, ImmutableList.of(5, 6));

    ps.onNext(7);
    ps.onComplete();
    testObserver.assertValueCount(3);
    testObserver.assertValueAt(2, ImmutableList.of(7));

    testScheduler.advanceTimeTo(850, TimeUnit.MILLISECONDS);
    testObserver.assertValueCount(3);

    }

    @Test
    public void advanceTimeBy() {
    PublishSubject<Integer> ps = PublishSubject.create();

    TestScheduler testScheduler = new TestScheduler();
    ObservableTransformer<Integer, List<Integer>> composer = BufferDebounceUtil.bufferDebounce(200,
    TimeUnit.MILLISECONDS, testScheduler);
    TestObserver<List<Integer>> testObserver = ps.compose(composer).test();
    testObserver.assertNoErrors();
    testObserver.assertNotComplete();
    testObserver.assertNoValues();

    ps.onNext(1);
    ps.onNext(2);
    testObserver.assertNoValues();
    testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
    testObserver.assertNoValues();
    ps.onNext(3);
    testObserver.assertNoValues();
    testScheduler.advanceTimeBy(150, TimeUnit.MILLISECONDS);
    testObserver.assertNoValues();
    testScheduler.advanceTimeBy(50, TimeUnit.MILLISECONDS);
    testObserver.assertValue(ImmutableList.of(1, 2, 3));

    prolongTheTimeline(testScheduler);

    ps.onNext(4);
    testObserver.assertValueCount(1);
    testScheduler.advanceTimeBy(400, TimeUnit.MILLISECONDS);
    testObserver.assertValueCount(2);
    testObserver.assertValueAt(1, ImmutableList.of(4));

    prolongTheTimeline(testScheduler);

    ps.onNext(5);
    testObserver.assertValueCount(2);
    testScheduler.advanceTimeBy(450, TimeUnit.MILLISECONDS);
    testObserver.assertValueCount(3);
    testObserver.assertValueAt(2, ImmutableList.of(5));

    prolongTheTimeline(testScheduler);

    ps.onNext(6);
    testObserver.assertValueCount(3);
    testScheduler.advanceTimeBy(800, TimeUnit.MILLISECONDS);
    testObserver.assertValueCount(4);
    testObserver.assertValueAt(3, ImmutableList.of(6));

    prolongTheTimeline(testScheduler);

    ps.onNext(7);
    ps.onComplete();
    testObserver.assertValueCount(5);
    testObserver.assertValueAt(4, ImmutableList.of(7));

    testScheduler.advanceTimeBy(850, TimeUnit.MILLISECONDS);
    testObserver.assertValueCount(5);

    }

    private static void prolongTheTimeline(TestScheduler testScheduler) {
    testScheduler.advanceTimeBy(1, TimeUnit.HOURS);
    }

    @Test
    public void advanceTimeByRealUseCase() {
    PublishSubject<Integer> ps = PublishSubject.create();
    TestScheduler testScheduler = new TestScheduler();
    ObservableTransformer<Integer, List<Integer>> composer = BufferDebounceUtil.bufferDebounce(200,
    TimeUnit.MILLISECONDS, testScheduler);
    TestObserver<List<Integer>> testObserver = ps.compose(composer).test();
    testObserver.assertNoErrors();
    testObserver.assertNotComplete();
    testObserver.assertNoValues();

    testScheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
    testObserver.assertNoValues();

    //iteration_1:first image
    ps.onNext(1);
    testScheduler.advanceTimeBy(80, TimeUnit.MILLISECONDS);

    //iteration_1:second image
    ps.onNext(2);
    testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

    testObserver.assertNoValues();
    //iteration_1:concluded
    testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
    testObserver.assertValueCount(1);
    testObserver.assertValue(ImmutableList.of(1, 2));

    //iteration_2:started
    testScheduler.advanceTimeBy(10000, TimeUnit.MILLISECONDS);
    //iteration_2:first image and only images
    ps.onNext(3);
    testScheduler.advanceTimeBy(150, TimeUnit.MILLISECONDS);
    testObserver.assertValueCount(1);
    testScheduler.advanceTimeBy(50, TimeUnit.MILLISECONDS);
    testObserver.assertValueCount(2);
    testObserver.assertValueAt(1, ImmutableList.of(3));

    //iteration_3:started
    testScheduler.advanceTimeBy(10000, TimeUnit.MILLISECONDS);
    //iteration_3:first image
    ps.onNext(4);
    testScheduler.advanceTimeBy(199, TimeUnit.MILLISECONDS);
    ps.onNext(5);
    testScheduler.advanceTimeBy(199, TimeUnit.MILLISECONDS);
    ps.onNext(6);
    testScheduler.advanceTimeBy(199, TimeUnit.MILLISECONDS);
    testObserver.assertValueCount(2);
    testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
    testObserver.assertValueCount(3);
    testObserver.assertValueAt(2, ImmutableList.of(4, 5, 6));


    }

    }