package com.example.repository.support; import org.reactivestreams.Publisher; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.repository.NoRepositoryBean; import org.springframework.data.repository.reactive.ReactiveCrudRepository; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import java.io.Serializable; import java.time.Duration; import java.util.List; import java.util.Optional; import java.util.function.Function; @NoRepositoryBean public class SimpleReactiveJpaRepositoryDecorator implements ReactiveCrudRepository { private final JpaRepository decoratedRepository; public SimpleReactiveJpaRepositoryDecorator(JpaRepository jpaRepository) { decoratedRepository = jpaRepository; } @Override public Mono save(S entity) { return applyCommonOperations(Mono.just(entity), flux -> flux.map(decoratedRepository::save)).single(); } @Override public Flux save(Iterable entities) { return applyCommonOperations(Mono.just(entities), flux -> flux.map(decoratedRepository::save).flatMap(Flux::fromIterable)); } @Override public Flux save(Publisher entityStream) { return applyCommonOperations(Flux.from(entityStream), flux -> flux.map(decoratedRepository::save).flatMap(Flux::fromIterable)); } @Override public Mono findOne(ID id) { return applyCommonOperations(Mono.just(id), flux -> flux.map(decoratedRepository::findOne).map(Optional::get).onErrorResumeWith(t -> Mono.empty())) .single(); } @Override public Mono findOne(Mono id) { return applyCommonOperations(id, flux -> flux.map(decoratedRepository::findOne).map(Optional::get).onErrorResumeWith(t -> Mono.empty())) .single(); } @Override public Mono exists(ID id) { return applyCommonOperations(Mono.just(id), flux -> flux.map(decoratedRepository::exists).onErrorResumeWith(t -> Mono.just(false))) .single(); } @Override public Mono exists(Mono id) { return applyCommonOperations(id, flux -> flux.map(decoratedRepository::exists).onErrorResumeWith(t -> Mono.just(false))) .single(); } @Override public Flux findAll() { return applyCommonOperations(Mono.empty(), flux -> flux .concatWith(Flux.defer(() -> Flux.fromIterable(decoratedRepository.findAll())))); } @Override public Flux findAll(Iterable ids) { return applyCommonOperations(Mono.empty(), flux -> flux .concatWith((Flux.defer(() -> Flux.fromIterable(decoratedRepository.findAll(ids)))))); } @Override public Flux findAll(Publisher idStream) { return applyCommonOperations(Flux.from(idStream), flux -> flux.flatMap(ids -> Flux.fromIterable(decoratedRepository.findAll(ids)))); } @Override public Mono count() { return applyCommonOperations(Mono.empty(), flux -> flux.concatWith(Flux.defer(() -> Flux.just(decoratedRepository.count())))) .single(); } @Override public Mono delete(ID id) { return applyCommonOperations(Mono.empty(), flux -> flux.concatWith(Flux.defer(() -> Mono.fromRunnable(() -> decoratedRepository.delete(id))))) .single(); } @Override public Mono delete(T entity) { return applyCommonOperations(Mono.empty(), flux -> flux.concatWith(Flux.defer(() -> Mono.fromRunnable(() -> decoratedRepository.delete(entity))))) .single(); } @Override public Mono delete(Iterable entities) { return applyCommonOperations(Mono.empty(), flux -> flux.concatWith(Flux.defer(() -> Mono.fromRunnable(() -> decoratedRepository.delete(entities) )))) .single(); } @Override public Mono delete(Publisher entityStream) { return applyCommonOperations(Flux.from(entityStream), flux -> flux.doOnNext(decoratedRepository::delete)) .then(); } @Override public Mono deleteAll() { return applyCommonOperations(Mono.empty(), flux -> flux.concatWith(Flux.defer(() -> Mono.fromRunnable(decoratedRepository::deleteAll)))) .single(); } protected static Flux applyCommonOperations( Mono input, Function, ? extends Publisher> transformer) { return input .flux() .publishOn(Schedulers.parallel()) .transform(transformer) .subscribeOn(Schedulers.elastic()) .publish() .autoConnect(0); } protected static Flux applyCommonOperations( Flux input, Function>, ? extends Publisher> transformer) { return input .buffer(Duration.ofMillis(100)) .publishOn(Schedulers.parallel()) .transform(transformer) .subscribeOn(Schedulers.elastic()) .publish() .autoConnect(0); } }