/** * an {@link ObservableOperator} that simulates an if-else in an RxJava Stream, it takes a {@link Map} * of {@link Predicate} as key and a {@link Function} as value ... the if any emitted item passes * the {@link Predicate}, this emitted item will be passed to the {@link Function} mapped to it, * and this item will be invoked and it's result will continue down the stream *

* sample code : *

* {@code List list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);}
* {@code Map, Function> blocks = new LinkedHashMap<>(2)}
* {@code blocks.put(i -> i % 2 == 0, i -> "Even number : " + i);}
* {@code blocks.put(i -> i % 2 != 0, i -> "Odd number : " + i);}
*

* {@code Observable.fromIterable(list)}
* {@code .lift(new IfElse<>(blocks))}
* {@code .subscribe(System.out::println);}
*

* // result :
* Odd number : 1
* Even number : 2
* Odd number : 3
* Even number : 4
* Odd number : 5
* Even number : 6
* Odd number : 7
* Even number : 8 */ public class IfElse implements ObservableOperator { private final Map, Function> blocks = new LinkedHashMap<>(); /** * create a {@link IfElse} operator, if any {@link Function} returned {@code null}, the * whole operation will crash * * @param blocks the map that holds {@link Function} that are the if-else blocks */ public IfElse(@NonNull Map, Function> blocks) { if (blocks != null) { this.blocks.putAll(blocks); } } @Override public Observer apply(final Observer observer) throws Exception { return createResultObserver(observer); } private Observer createResultObserver(final Observer observer) { return new Observer() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull T emittedItem) { List> validBlocks = Observable.fromIterable(blocks.keySet()) .filter(key -> key.test(emittedItem)) .map(blocks::get) .toList() .flatMapMaybe(Maybe::just) .blockingGet(); if (validBlocks == null) { return; } try { for (Function block : validBlocks) { invokeOnNext(observer, block.apply(emittedItem)); } } catch (Throwable e) { onError(e); } } @Override public void onError(@NonNull Throwable e) { observer.onError(e); } @Override public void onComplete() { observer.onComplete(); } }; } private void invokeOnNext(Observer observer, R onNextValue) { observer.onNext(onNextValue); } }