package com.jayxhj.util; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.function.Consumer; import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; /** * 使用 ExecutingCompleteService 进行多线程任务结果处理并增加重试机制 * * @author jayxhj */ public class ExecutingCompleteServiceUtil { private static Logger logger; /** * 使用 Executor 并行处理请求,并传入 Consumer 消费处理结果,可节省由于各任务处理不一致导致的等待时间 * * @param e Executor * @param solvers Callables * @param consumer Consumer * @param failureConsumer 任务处理失败后的处理 * @param maxRetryCount 重试次数,当 maxRetryCount 为 0 时代表不需要重试, */ public static void solve(Executor e, Collection> solvers, Consumer consumer, Consumer failureConsumer, int maxRetryCount) { if (CollectionUtils.isEmpty(solvers)) { return; } //重试最终失败异常信息落日志 int n = solvers.size(); if (maxRetryCount < 0 && n > 0) { try { logger.error("有{}个任务经异常重试后仍然异常,task为{}", n, SeriUtils.writeObject(solvers)); } catch (Exception e1) { logger.error(e1.getMessage(), e1); } if (failureConsumer != null) { failureConsumer.accept(solvers); } return; } //任务处理 CompletionService ecs = new ExecutorCompletionService<>(e); Map, Callable> callableFutureMap = new HashMap<>(solvers.size()); for (Callable s : solvers) { Future stringFuture = ecs.submit(s); callableFutureMap.put(stringFuture, s); } //超时任务记录,用于重试 List> failedSolvers = new ArrayList<>(); for (int i = 0; i < n; i++) { Future future = null; try { future = ecs.take(); } catch (InterruptedException e1) { Thread.currentThread().interrupt(); } Callable callable = null; try { if (future == null) { continue; } T r = future.get(); if (r != null) { consumer.accept(r); } } catch (InterruptedException e1) { Thread.currentThread().interrupt(); } catch (ExecutionException e2) { callable = callableFutureMap.get(future); if (callable != null) { failedSolvers.add(callable); } logger.error(e2.getMessage(), e2); } } //有任务失败时进行重试 if (CollectionUtils.isNotEmpty(failedSolvers)) { logger.info("有{}个任务异常,开始重试", failedSolvers.size()); solve(e, failedSolvers, consumer, failureConsumer, --maxRetryCount); } } }