Skip to content

Instantly share code, notes, and snippets.

@haomega
Last active February 1, 2025 07:53
Show Gist options
  • Select an option

  • Save haomega/cfae14f47054e36d7c3ae4ac3e7cf90b to your computer and use it in GitHub Desktop.

Select an option

Save haomega/cfae14f47054e36d7c3ae4ac3e7cf90b to your computer and use it in GitHub Desktop.

Revisions

  1. haomega revised this gist Dec 16, 2024. 1 changed file with 26 additions and 5 deletions.
    31 changes: 26 additions & 5 deletions use-case.md
    Original file line number Diff line number Diff line change
    @@ -1,8 +1,14 @@
    ## 包装CompletableFuture & 提供拓展方法
    - ` public final CompletableFuture<T> completableFuture;` 包装并且是`public`
    - `thenWaitMills` 等待n毫秒后
    - `thenWaitSignal` 等待某个条件触发后
    - `thenRunWithRetry` 带重试的执行
    ## Feature

    - `public final CompletableFuture<T> completableFuture;` 包装CompletableFuture,使用`public`对外暴露
    - 提供拓展方法:
    - `thenWaitMills` 等待n毫秒后
    - `thenWaitSignal` 等待某个条件触发后
    - `thenRunWithRetry` 带重试的执行

    ## todo
    -[ ] 提供cancelAll
    -[ ] extend CompletableFuture

    ## 使用用例

    @@ -28,3 +34,18 @@ new ScheduledThreadPoolExecutor(1)
    TimeUnit.SECONDS.sleep(1000);
    ```
    out put
    ```
    2024-12-16T10:34:28.311 one
    2024-12-16T10:34:28.311 two
    2024-12-16T10:34:28.311 three: retry 0
    2024-12-16T10:34:28.817 three: retry 1
    2024-12-16T10:34:29.323 three: retry 2
    2024-12-16T10:34:29.829 three: retry 3
    2024-12-16T10:34:30.332 three: retry 4
    2024-12-16T10:34:30.833 three: retry 5
    2024-12-16T10:34:31.340 three: retry 6
    2024-12-16T10:34:32.345 four
    2024-12-16T10:34:33.304 signal callback
    2024-12-16T10:34:33.305 five
    ```
  2. haomega created this gist Dec 16, 2024.
    103 changes: 103 additions & 0 deletions CompletableFutureEx.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,103 @@
    package com.luhao.demo;

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.ScheduledThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.function.IntPredicate;

    /**
    * @author luhao
    */
    public class CompletableFutureEx<T> {
    protected static final ScheduledExecutorService SINGLE_SCHEDULE_EXECUTOR;
    private static final Runnable NO_OP;

    static {
    NO_OP = () -> {
    // do nothing
    };
    // only for schedule, don't use it for other purpose
    SINGLE_SCHEDULE_EXECUTOR = new ScheduledThreadPoolExecutor(1);
    }

    private final CompletableFuture<T> completableFuture;

    private CompletableFutureEx(CompletableFuture<T> completableFuture) {
    this.completableFuture = completableFuture;
    }

    public static CompletableFutureEx<Void> runAsync(Runnable runnable) {
    CompletableFuture<Void> cf = CompletableFuture.runAsync(runnable);
    return new CompletableFutureEx<>(cf);
    }

    public static CompletableFutureEx<Void> waitMillsAsync(long waitMills) {
    return runAsync(NO_OP)
    .thenWaitMills(waitMills);
    }

    public CompletableFutureEx<Void> thenRunAsync(Runnable runnable) {
    CompletableFuture<Void> cf = this.completableFuture.thenRunAsync(runnable);
    return new CompletableFutureEx<>(cf);
    }

    public CompletableFutureEx<Void> thenWaitMills(long delayMills) {
    CompletableFuture<Void> nextCf = new CompletableFuture<>();

    CompletableFuture<Void> cf = this.completableFuture
    .thenRunAsync(() -> SINGLE_SCHEDULE_EXECUTOR.schedule(() -> nextCf.complete(null), delayMills,
    TimeUnit.MILLISECONDS))
    .thenCompose(it -> nextCf);

    return new CompletableFutureEx<>(cf);
    }

    public CompletableFutureEx<Void> thenRunWithRetry(int retryIntervalMills, IntPredicate predicate) {
    // 构建重试CompletableFuture
    CompletableFuture<Void> finishCf = new CompletableFuture<>();
    RetryAction retryAction = new RetryAction(retryIntervalMills, predicate, finishCf);

    // 触发Retry & Compose
    CompletableFuture<Void> nextCf = this.completableFuture.thenRunAsync(retryAction);
    CompletableFuture<Void> composeCf = nextCf.thenCompose(it -> finishCf);

    return new CompletableFutureEx<>(composeCf);
    }

    public CompletableFutureEx<Void> thenWaitSignal(Signal signal) {
    CompletableFuture<Void> cf = new CompletableFuture<>();
    signal.registerPostAction(() -> cf.complete(null));
    CompletableFuture<Void> cf2 = this.completableFuture.thenCompose(t -> cf);
    return new CompletableFutureEx<>(cf2);
    }

    private static class RetryAction implements Runnable {
    private final int retryInterval;
    private final AtomicInteger retryCount = new AtomicInteger(0);
    private final IntPredicate predicate;
    private final CompletableFuture<Void> finishCf;

    private RetryAction(int retryInterval, IntPredicate predicate, CompletableFuture<Void> finishCf) {
    this.retryInterval = retryInterval;
    this.predicate = predicate;
    this.finishCf = finishCf;
    }

    @Override
    public void run() {
    try {
    boolean success = predicate.test(retryCount.getAndIncrement());
    if (success) {
    finishCf.complete(null);
    } else {
    SINGLE_SCHEDULE_EXECUTOR.schedule(() -> CompletableFuture.runAsync(this),
    retryInterval, TimeUnit.MILLISECONDS);
    }
    } catch (Exception e) {
    finishCf.completeExceptionally(e);
    }
    }
    }
    }
    30 changes: 30 additions & 0 deletions use-case.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,30 @@
    ## 包装CompletableFuture & 提供拓展方法
    - ` public final CompletableFuture<T> completableFuture;` 包装并且是`public`
    - `thenWaitMills` 等待n毫秒后
    - `thenWaitSignal` 等待某个条件触发后
    - `thenRunWithRetry` 带重试的执行

    ## 使用用例

    ```
    Signal signal = new Signal();
    CompletableFutureEx
    .runAsync(() -> print("one"))
    .thenRunAsync(() -> print("two"))
    .thenRunWithRetry(500, i -> {
    print("three: retry " + i);
    return i > 5;
    })
    .thenWaitMills(1000)
    .thenRunAsync(() -> print("four"))
    .thenWaitSignal(signal)
    .thenRunAsync(() -> print("five"));
    new ScheduledThreadPoolExecutor(1)
    .schedule(() -> {
    print("signal callback");
    signal.onFinish();
    }, 5, TimeUnit.SECONDS);
    TimeUnit.SECONDS.sleep(1000);
    ```