Created
June 25, 2019 13:54
-
-
Save jayxhj/5a94c5b51e5dfa50623cb35f32ef69b0 to your computer and use it in GitHub Desktop.
使用 ExecutingCompleteService 进行多线程任务结果处理并增加重试机制
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.jayxhj.util; | |
| import java.util.ArrayList; | |
| import java.util.Collection; | |
| import java.util.HashMap; | |
| import java.util.List; | |
| import java.util.Map; | |
| import java.util.concurrent.Callable; | |
| import java.util.concurrent.CompletionService; | |
| import java.util.concurrent.ExecutionException; | |
| import java.util.concurrent.Executor; | |
| import java.util.concurrent.ExecutorCompletionService; | |
| import java.util.concurrent.Future; | |
| import java.util.function.Consumer; | |
| import org.apache.commons.collections4.CollectionUtils; | |
| import org.slf4j.Logger; | |
| /** | |
| * 使用 ExecutingCompleteService 进行多线程任务结果处理并增加重试机制 | |
| * | |
| * @author jayxhj | |
| */ | |
| public class ExecutingCompleteServiceUtil { | |
| private static Logger logger; | |
| /** | |
| * 使用 Executor 并行处理请求,并传入 Consumer 消费处理结果,可节省由于各任务处理不一致导致的等待时间 | |
| * | |
| * @param e Executor | |
| * @param solvers Callables | |
| * @param consumer Consumer | |
| * @param failureConsumer 任务处理失败后的处理 | |
| * @param maxRetryCount 重试次数,当 maxRetryCount 为 0 时代表不需要重试, | |
| */ | |
| public static <T> void solve(Executor e, Collection<Callable<T>> solvers, Consumer consumer, Consumer failureConsumer, | |
| int maxRetryCount) { | |
| if (CollectionUtils.isEmpty(solvers)) { | |
| return; | |
| } | |
| //重试最终失败异常信息落日志 | |
| int n = solvers.size(); | |
| if (maxRetryCount < 0 && n > 0) { | |
| try { | |
| logger.error("有{}个任务经异常重试后仍然异常,task为{}", n, SeriUtils.writeObject(solvers)); | |
| } catch (Exception e1) { | |
| logger.error(e1.getMessage(), e1); | |
| } | |
| if (failureConsumer != null) { | |
| failureConsumer.accept(solvers); | |
| } | |
| return; | |
| } | |
| //任务处理 | |
| CompletionService<T> ecs = new ExecutorCompletionService<>(e); | |
| Map<Future<T>, Callable<T>> callableFutureMap = new HashMap<>(solvers.size()); | |
| for (Callable<T> s : solvers) { | |
| Future<T> stringFuture = ecs.submit(s); | |
| callableFutureMap.put(stringFuture, s); | |
| } | |
| //超时任务记录,用于重试 | |
| List<Callable<T>> failedSolvers = new ArrayList<>(); | |
| for (int i = 0; i < n; i++) { | |
| Future<T> future = null; | |
| try { | |
| future = ecs.take(); | |
| } catch (InterruptedException e1) { | |
| Thread.currentThread().interrupt(); | |
| } | |
| Callable<T> callable = null; | |
| try { | |
| if (future == null) { | |
| continue; | |
| } | |
| T r = future.get(); | |
| if (r != null) { | |
| consumer.accept(r); | |
| } | |
| } catch (InterruptedException e1) { | |
| Thread.currentThread().interrupt(); | |
| } catch (ExecutionException e2) { | |
| callable = callableFutureMap.get(future); | |
| if (callable != null) { | |
| failedSolvers.add(callable); | |
| } | |
| logger.error(e2.getMessage(), e2); | |
| } | |
| } | |
| //有任务失败时进行重试 | |
| if (CollectionUtils.isNotEmpty(failedSolvers)) { | |
| logger.info("有{}个任务异常,开始重试", failedSolvers.size()); | |
| solve(e, failedSolvers, consumer, failureConsumer, --maxRetryCount); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment