/**
* an {@link ObservableOperator} that simulates an if-else in an RxJava Stream, it takes a {@link Map}
* of {@link Predicate} as key and a {@link Function} as value ... the if any emitted item passes
* the {@link Predicate}, this emitted item will be passed to the {@link Function} mapped to it,
* and this item will be invoked and it's result will continue down the stream
*
* sample code :
*
* {@code List list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);}
* {@code Map, Function> blocks = new LinkedHashMap<>(2)}
* {@code blocks.put(i -> i % 2 == 0, i -> "Even number : " + i);}
* {@code blocks.put(i -> i % 2 != 0, i -> "Odd number : " + i);}
*
* {@code Observable.fromIterable(list)}
* {@code .lift(new IfElse<>(blocks))}
* {@code .subscribe(System.out::println);}
*
* // result :
* Odd number : 1
* Even number : 2
* Odd number : 3
* Even number : 4
* Odd number : 5
* Even number : 6
* Odd number : 7
* Even number : 8
*/
public class IfElse implements ObservableOperator {
private final Map, Function> blocks = new LinkedHashMap<>();
/**
* create a {@link IfElse} operator, if any {@link Function} returned {@code null}, the
* whole operation will crash
*
* @param blocks the map that holds {@link Function} that are the if-else blocks
*/
public IfElse(@NonNull Map, Function> blocks) {
if (blocks != null) {
this.blocks.putAll(blocks);
}
}
@Override
public Observer super T> apply(final Observer super R> observer) throws Exception {
return createResultObserver(observer);
}
private Observer createResultObserver(final Observer super R> observer) {
return new Observer() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull T emittedItem) {
List> validBlocks = Observable.fromIterable(blocks.keySet())
.filter(key -> key.test(emittedItem))
.map(blocks::get)
.toList()
.flatMapMaybe(Maybe::just)
.blockingGet();
if (validBlocks == null) {
return;
}
try {
for (Function block : validBlocks) {
invokeOnNext(observer, block.apply(emittedItem));
}
} catch (Throwable e) {
onError(e);
}
}
@Override
public void onError(@NonNull Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
private void invokeOnNext(Observer super R> observer, R onNextValue) {
observer.onNext(onNextValue);
}
}