/** * an {@link ObservableOperator} that simulates an if-else in an RxJava Stream, it takes a {@link Map} * of {@link Predicate} as key and a {@link Function} as value ... the if any emitted item passes * the {@link Predicate}, this emitted item will be passed to the {@link Function} mapped to it, * and this item will be invoked and it's result will continue down the stream */ public class IfElse implements ObservableOperator { private final Map, Function> blocks = new LinkedHashMap<>(); /** * create a {@link IfElse} operator, if any {@link Function} returned {@code null}, the * whole operation will crash * * @param blocks the map that holds {@link Function} that are the if-else blocks */ public IfElse(@NonNull Map, Function> blocks) { if (blocks != null) { this.blocks.putAll(blocks); } } @Override public Observer apply(final Observer observer) throws Exception { return createResultObserver(observer); } private Observer createResultObserver(final Observer observer) { return new Observer() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull T emittedItem) { List> validBlocks = Observable.fromIterable(blocks.keySet()) .filter(key -> key.test(emittedItem)) .map(blocks::get) .toList() .flatMapMaybe(Maybe::just) .blockingGet(); if (validBlocks == null) { return; } try { for (Function block : validBlocks) { invokeOnNext(observer, block.apply(emittedItem)); } } catch (Throwable e) { onError(e); } } @Override public void onError(@NonNull Throwable e) { observer.onError(e); } @Override public void onComplete() { observer.onComplete(); } }; } private void invokeOnNext(Observer observer, R onNextValue) { observer.onNext(onNextValue); } }