Skip to content

Instantly share code, notes, and snippets.

@jayxhj
Created June 25, 2019 13:54
Show Gist options
  • Select an option

  • Save jayxhj/5a94c5b51e5dfa50623cb35f32ef69b0 to your computer and use it in GitHub Desktop.

Select an option

Save jayxhj/5a94c5b51e5dfa50623cb35f32ef69b0 to your computer and use it in GitHub Desktop.

Revisions

  1. jayxhj created this gist Jun 25, 2019.
    101 changes: 101 additions & 0 deletions ExecutingCompleteServiceUtil.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,101 @@
    package com.jayxhj.util;

    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CompletionService;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Executor;
    import java.util.concurrent.ExecutorCompletionService;
    import java.util.concurrent.Future;
    import java.util.function.Consumer;
    import org.apache.commons.collections4.CollectionUtils;
    import org.slf4j.Logger;

    /**
    * 使用 ExecutingCompleteService 进行多线程任务结果处理并增加重试机制
    *
    * @author jayxhj
    */
    public class ExecutingCompleteServiceUtil {

    private static Logger logger;

    /**
    * 使用 Executor 并行处理请求,并传入 Consumer 消费处理结果,可节省由于各任务处理不一致导致的等待时间
    *
    * @param e Executor
    * @param solvers Callables
    * @param consumer Consumer
    * @param failureConsumer 任务处理失败后的处理
    * @param maxRetryCount 重试次数,当 maxRetryCount 为 0 时代表不需要重试,
    */
    public static <T> void solve(Executor e, Collection<Callable<T>> solvers, Consumer consumer, Consumer failureConsumer,
    int maxRetryCount) {
    if (CollectionUtils.isEmpty(solvers)) {
    return;
    }

    //重试最终失败异常信息落日志
    int n = solvers.size();
    if (maxRetryCount < 0 && n > 0) {
    try {
    logger.error("有{}个任务经异常重试后仍然异常,task为{}", n, SeriUtils.writeObject(solvers));
    } catch (Exception e1) {
    logger.error(e1.getMessage(), e1);
    }
    if (failureConsumer != null) {
    failureConsumer.accept(solvers);
    }
    return;
    }

    //任务处理
    CompletionService<T> ecs = new ExecutorCompletionService<>(e);
    Map<Future<T>, Callable<T>> callableFutureMap = new HashMap<>(solvers.size());
    for (Callable<T> s : solvers) {
    Future<T> stringFuture = ecs.submit(s);
    callableFutureMap.put(stringFuture, s);
    }

    //超时任务记录,用于重试
    List<Callable<T>> failedSolvers = new ArrayList<>();
    for (int i = 0; i < n; i++) {
    Future<T> future = null;
    try {
    future = ecs.take();
    } catch (InterruptedException e1) {
    Thread.currentThread().interrupt();
    }
    Callable<T> callable = null;
    try {
    if (future == null) {
    continue;
    }
    T r = future.get();
    if (r != null) {
    consumer.accept(r);
    }
    } catch (InterruptedException e1) {
    Thread.currentThread().interrupt();
    } catch (ExecutionException e2) {
    callable = callableFutureMap.get(future);
    if (callable != null) {
    failedSolvers.add(callable);
    }
    logger.error(e2.getMessage(), e2);
    }
    }

    //有任务失败时进行重试
    if (CollectionUtils.isNotEmpty(failedSolvers)) {
    logger.info("有{}个任务异常,开始重试", failedSolvers.size());
    solve(e, failedSolvers, consumer, failureConsumer, --maxRetryCount);
    }
    }


    }