Skip to content

Instantly share code, notes, and snippets.

@haomega
Last active February 1, 2025 07:53
Show Gist options
  • Save haomega/cfae14f47054e36d7c3ae4ac3e7cf90b to your computer and use it in GitHub Desktop.
Save haomega/cfae14f47054e36d7c3ae4ac3e7cf90b to your computer and use it in GitHub Desktop.
Java CompletableFuture Extend
package com.luhao.demo;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntPredicate;
/**
* @author luhao
*/
public class CompletableFutureEx<T> {
protected static final ScheduledExecutorService SINGLE_SCHEDULE_EXECUTOR;
private static final Runnable NO_OP;
static {
NO_OP = () -> {
// do nothing
};
// only for schedule, don't use it for other purpose
SINGLE_SCHEDULE_EXECUTOR = new ScheduledThreadPoolExecutor(1);
}
private final CompletableFuture<T> completableFuture;
private CompletableFutureEx(CompletableFuture<T> completableFuture) {
this.completableFuture = completableFuture;
}
public static CompletableFutureEx<Void> runAsync(Runnable runnable) {
CompletableFuture<Void> cf = CompletableFuture.runAsync(runnable);
return new CompletableFutureEx<>(cf);
}
public static CompletableFutureEx<Void> waitMillsAsync(long waitMills) {
return runAsync(NO_OP)
.thenWaitMills(waitMills);
}
public CompletableFutureEx<Void> thenRunAsync(Runnable runnable) {
CompletableFuture<Void> cf = this.completableFuture.thenRunAsync(runnable);
return new CompletableFutureEx<>(cf);
}
public CompletableFutureEx<Void> thenWaitMills(long delayMills) {
CompletableFuture<Void> nextCf = new CompletableFuture<>();
CompletableFuture<Void> cf = this.completableFuture
.thenRunAsync(() -> SINGLE_SCHEDULE_EXECUTOR.schedule(() -> nextCf.complete(null), delayMills,
TimeUnit.MILLISECONDS))
.thenCompose(it -> nextCf);
return new CompletableFutureEx<>(cf);
}
public CompletableFutureEx<Void> thenRunWithRetry(int retryIntervalMills, IntPredicate predicate) {
// 构建重试CompletableFuture
CompletableFuture<Void> finishCf = new CompletableFuture<>();
RetryAction retryAction = new RetryAction(retryIntervalMills, predicate, finishCf);
// 触发Retry & Compose
CompletableFuture<Void> nextCf = this.completableFuture.thenRunAsync(retryAction);
CompletableFuture<Void> composeCf = nextCf.thenCompose(it -> finishCf);
return new CompletableFutureEx<>(composeCf);
}
public CompletableFutureEx<Void> thenWaitSignal(Signal signal) {
CompletableFuture<Void> cf = new CompletableFuture<>();
signal.registerPostAction(() -> cf.complete(null));
CompletableFuture<Void> cf2 = this.completableFuture.thenCompose(t -> cf);
return new CompletableFutureEx<>(cf2);
}
private static class RetryAction implements Runnable {
private final int retryInterval;
private final AtomicInteger retryCount = new AtomicInteger(0);
private final IntPredicate predicate;
private final CompletableFuture<Void> finishCf;
private RetryAction(int retryInterval, IntPredicate predicate, CompletableFuture<Void> finishCf) {
this.retryInterval = retryInterval;
this.predicate = predicate;
this.finishCf = finishCf;
}
@Override
public void run() {
try {
boolean success = predicate.test(retryCount.getAndIncrement());
if (success) {
finishCf.complete(null);
} else {
SINGLE_SCHEDULE_EXECUTOR.schedule(() -> CompletableFuture.runAsync(this),
retryInterval, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
finishCf.completeExceptionally(e);
}
}
}
}

Feature

  • public final CompletableFuture<T> completableFuture; 包装CompletableFuture,使用public对外暴露
  • 提供拓展方法:
    • thenWaitMills 等待n毫秒后
    • thenWaitSignal 等待某个条件触发后
    • thenRunWithRetry 带重试的执行

todo

-[ ] 提供cancelAll -[ ] extend CompletableFuture

使用用例

Signal signal = new Signal();
CompletableFutureEx
        .runAsync(() -> print("one"))
        .thenRunAsync(() -> print("two"))
        .thenRunWithRetry(500, i -> {
            print("three: retry " + i);
            return i > 5;
        })
        .thenWaitMills(1000)
        .thenRunAsync(() -> print("four"))
        .thenWaitSignal(signal)
        .thenRunAsync(() -> print("five"));

new ScheduledThreadPoolExecutor(1)
        .schedule(() -> {
            print("signal callback");
            signal.onFinish();
        }, 5, TimeUnit.SECONDS);

TimeUnit.SECONDS.sleep(1000);

out put

2024-12-16T10:34:28.311 one
2024-12-16T10:34:28.311 two
2024-12-16T10:34:28.311 three: retry 0
2024-12-16T10:34:28.817 three: retry 1
2024-12-16T10:34:29.323 three: retry 2
2024-12-16T10:34:29.829 three: retry 3
2024-12-16T10:34:30.332 three: retry 4
2024-12-16T10:34:30.833 three: retry 5
2024-12-16T10:34:31.340 three: retry 6
2024-12-16T10:34:32.345 four
2024-12-16T10:34:33.304 signal callback
2024-12-16T10:34:33.305 five
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment