|
package com.luhao.demo; |
|
|
|
import java.util.concurrent.CompletableFuture; |
|
import java.util.concurrent.ScheduledExecutorService; |
|
import java.util.concurrent.ScheduledThreadPoolExecutor; |
|
import java.util.concurrent.TimeUnit; |
|
import java.util.concurrent.atomic.AtomicInteger; |
|
import java.util.function.IntPredicate; |
|
|
|
/** |
|
* @author luhao |
|
*/ |
|
public class CompletableFutureEx<T> { |
|
protected static final ScheduledExecutorService SINGLE_SCHEDULE_EXECUTOR; |
|
private static final Runnable NO_OP; |
|
|
|
static { |
|
NO_OP = () -> { |
|
// do nothing |
|
}; |
|
// only for schedule, don't use it for other purpose |
|
SINGLE_SCHEDULE_EXECUTOR = new ScheduledThreadPoolExecutor(1); |
|
} |
|
|
|
private final CompletableFuture<T> completableFuture; |
|
|
|
private CompletableFutureEx(CompletableFuture<T> completableFuture) { |
|
this.completableFuture = completableFuture; |
|
} |
|
|
|
public static CompletableFutureEx<Void> runAsync(Runnable runnable) { |
|
CompletableFuture<Void> cf = CompletableFuture.runAsync(runnable); |
|
return new CompletableFutureEx<>(cf); |
|
} |
|
|
|
public static CompletableFutureEx<Void> waitMillsAsync(long waitMills) { |
|
return runAsync(NO_OP) |
|
.thenWaitMills(waitMills); |
|
} |
|
|
|
public CompletableFutureEx<Void> thenRunAsync(Runnable runnable) { |
|
CompletableFuture<Void> cf = this.completableFuture.thenRunAsync(runnable); |
|
return new CompletableFutureEx<>(cf); |
|
} |
|
|
|
public CompletableFutureEx<Void> thenWaitMills(long delayMills) { |
|
CompletableFuture<Void> nextCf = new CompletableFuture<>(); |
|
|
|
CompletableFuture<Void> cf = this.completableFuture |
|
.thenRunAsync(() -> SINGLE_SCHEDULE_EXECUTOR.schedule(() -> nextCf.complete(null), delayMills, |
|
TimeUnit.MILLISECONDS)) |
|
.thenCompose(it -> nextCf); |
|
|
|
return new CompletableFutureEx<>(cf); |
|
} |
|
|
|
public CompletableFutureEx<Void> thenRunWithRetry(int retryIntervalMills, IntPredicate predicate) { |
|
// 构建重试CompletableFuture |
|
CompletableFuture<Void> finishCf = new CompletableFuture<>(); |
|
RetryAction retryAction = new RetryAction(retryIntervalMills, predicate, finishCf); |
|
|
|
// 触发Retry & Compose |
|
CompletableFuture<Void> nextCf = this.completableFuture.thenRunAsync(retryAction); |
|
CompletableFuture<Void> composeCf = nextCf.thenCompose(it -> finishCf); |
|
|
|
return new CompletableFutureEx<>(composeCf); |
|
} |
|
|
|
public CompletableFutureEx<Void> thenWaitSignal(Signal signal) { |
|
CompletableFuture<Void> cf = new CompletableFuture<>(); |
|
signal.registerPostAction(() -> cf.complete(null)); |
|
CompletableFuture<Void> cf2 = this.completableFuture.thenCompose(t -> cf); |
|
return new CompletableFutureEx<>(cf2); |
|
} |
|
|
|
private static class RetryAction implements Runnable { |
|
private final int retryInterval; |
|
private final AtomicInteger retryCount = new AtomicInteger(0); |
|
private final IntPredicate predicate; |
|
private final CompletableFuture<Void> finishCf; |
|
|
|
private RetryAction(int retryInterval, IntPredicate predicate, CompletableFuture<Void> finishCf) { |
|
this.retryInterval = retryInterval; |
|
this.predicate = predicate; |
|
this.finishCf = finishCf; |
|
} |
|
|
|
@Override |
|
public void run() { |
|
try { |
|
boolean success = predicate.test(retryCount.getAndIncrement()); |
|
if (success) { |
|
finishCf.complete(null); |
|
} else { |
|
SINGLE_SCHEDULE_EXECUTOR.schedule(() -> CompletableFuture.runAsync(this), |
|
retryInterval, TimeUnit.MILLISECONDS); |
|
} |
|
} catch (Exception e) { |
|
finishCf.completeExceptionally(e); |
|
} |
|
} |
|
} |
|
} |