Last active
February 1, 2025 07:53
-
-
Save haomega/cfae14f47054e36d7c3ae4ac3e7cf90b to your computer and use it in GitHub Desktop.
Revisions
-
haomega revised this gist
Dec 16, 2024 . 1 changed file with 26 additions and 5 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,8 +1,14 @@ ## Feature - `public final CompletableFuture<T> completableFuture;` 包装CompletableFuture,使用`public`对外暴露 - 提供拓展方法: - `thenWaitMills` 等待n毫秒后 - `thenWaitSignal` 等待某个条件触发后 - `thenRunWithRetry` 带重试的执行 ## todo -[ ] 提供cancelAll -[ ] extend CompletableFuture ## 使用用例 @@ -28,3 +34,18 @@ new ScheduledThreadPoolExecutor(1) TimeUnit.SECONDS.sleep(1000); ``` out put ``` 2024-12-16T10:34:28.311 one 2024-12-16T10:34:28.311 two 2024-12-16T10:34:28.311 three: retry 0 2024-12-16T10:34:28.817 three: retry 1 2024-12-16T10:34:29.323 three: retry 2 2024-12-16T10:34:29.829 three: retry 3 2024-12-16T10:34:30.332 three: retry 4 2024-12-16T10:34:30.833 three: retry 5 2024-12-16T10:34:31.340 three: retry 6 2024-12-16T10:34:32.345 four 2024-12-16T10:34:33.304 signal callback 2024-12-16T10:34:33.305 five ``` -
haomega created this gist
Dec 16, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,103 @@ package com.luhao.demo; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntPredicate; /** * @author luhao */ public class CompletableFutureEx<T> { protected static final ScheduledExecutorService SINGLE_SCHEDULE_EXECUTOR; private static final Runnable NO_OP; static { NO_OP = () -> { // do nothing }; // only for schedule, don't use it for other purpose SINGLE_SCHEDULE_EXECUTOR = new ScheduledThreadPoolExecutor(1); } private final CompletableFuture<T> completableFuture; private CompletableFutureEx(CompletableFuture<T> completableFuture) { this.completableFuture = completableFuture; } public static CompletableFutureEx<Void> runAsync(Runnable runnable) { CompletableFuture<Void> cf = CompletableFuture.runAsync(runnable); return new CompletableFutureEx<>(cf); } public static CompletableFutureEx<Void> waitMillsAsync(long waitMills) { return runAsync(NO_OP) .thenWaitMills(waitMills); } public CompletableFutureEx<Void> thenRunAsync(Runnable runnable) { CompletableFuture<Void> cf = this.completableFuture.thenRunAsync(runnable); return new CompletableFutureEx<>(cf); } public CompletableFutureEx<Void> thenWaitMills(long delayMills) { CompletableFuture<Void> nextCf = new CompletableFuture<>(); CompletableFuture<Void> cf = this.completableFuture .thenRunAsync(() -> SINGLE_SCHEDULE_EXECUTOR.schedule(() -> nextCf.complete(null), delayMills, TimeUnit.MILLISECONDS)) .thenCompose(it -> nextCf); return new CompletableFutureEx<>(cf); } public CompletableFutureEx<Void> thenRunWithRetry(int retryIntervalMills, IntPredicate predicate) { // 构建重试CompletableFuture CompletableFuture<Void> finishCf = new CompletableFuture<>(); RetryAction retryAction = new RetryAction(retryIntervalMills, predicate, finishCf); // 触发Retry & Compose CompletableFuture<Void> nextCf = this.completableFuture.thenRunAsync(retryAction); CompletableFuture<Void> composeCf = nextCf.thenCompose(it -> finishCf); return new CompletableFutureEx<>(composeCf); } public CompletableFutureEx<Void> thenWaitSignal(Signal signal) { CompletableFuture<Void> cf = new CompletableFuture<>(); signal.registerPostAction(() -> cf.complete(null)); CompletableFuture<Void> cf2 = this.completableFuture.thenCompose(t -> cf); return new CompletableFutureEx<>(cf2); } private static class RetryAction implements Runnable { private final int retryInterval; private final AtomicInteger retryCount = new AtomicInteger(0); private final IntPredicate predicate; private final CompletableFuture<Void> finishCf; private RetryAction(int retryInterval, IntPredicate predicate, CompletableFuture<Void> finishCf) { this.retryInterval = retryInterval; this.predicate = predicate; this.finishCf = finishCf; } @Override public void run() { try { boolean success = predicate.test(retryCount.getAndIncrement()); if (success) { finishCf.complete(null); } else { SINGLE_SCHEDULE_EXECUTOR.schedule(() -> CompletableFuture.runAsync(this), retryInterval, TimeUnit.MILLISECONDS); } } catch (Exception e) { finishCf.completeExceptionally(e); } } } } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,30 @@ ## 包装CompletableFuture & 提供拓展方法 - ` public final CompletableFuture<T> completableFuture;` 包装并且是`public` - `thenWaitMills` 等待n毫秒后 - `thenWaitSignal` 等待某个条件触发后 - `thenRunWithRetry` 带重试的执行 ## 使用用例 ``` Signal signal = new Signal(); CompletableFutureEx .runAsync(() -> print("one")) .thenRunAsync(() -> print("two")) .thenRunWithRetry(500, i -> { print("three: retry " + i); return i > 5; }) .thenWaitMills(1000) .thenRunAsync(() -> print("four")) .thenWaitSignal(signal) .thenRunAsync(() -> print("five")); new ScheduledThreadPoolExecutor(1) .schedule(() -> { print("signal callback"); signal.onFinish(); }, 5, TimeUnit.SECONDS); TimeUnit.SECONDS.sleep(1000); ```