Skip to content

Instantly share code, notes, and snippets.

@jayxhj
Created June 25, 2019 13:54
Show Gist options
  • Save jayxhj/5a94c5b51e5dfa50623cb35f32ef69b0 to your computer and use it in GitHub Desktop.
Save jayxhj/5a94c5b51e5dfa50623cb35f32ef69b0 to your computer and use it in GitHub Desktop.
使用 ExecutingCompleteService 进行多线程任务结果处理并增加重试机制
package com.jayxhj.util;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
/**
* 使用 ExecutingCompleteService 进行多线程任务结果处理并增加重试机制
*
* @author jayxhj
*/
public class ExecutingCompleteServiceUtil {
private static Logger logger;
/**
* 使用 Executor 并行处理请求,并传入 Consumer 消费处理结果,可节省由于各任务处理不一致导致的等待时间
*
* @param e Executor
* @param solvers Callables
* @param consumer Consumer
* @param failureConsumer 任务处理失败后的处理
* @param maxRetryCount 重试次数,当 maxRetryCount 为 0 时代表不需要重试,
*/
public static <T> void solve(Executor e, Collection<Callable<T>> solvers, Consumer consumer, Consumer failureConsumer,
int maxRetryCount) {
if (CollectionUtils.isEmpty(solvers)) {
return;
}
//重试最终失败异常信息落日志
int n = solvers.size();
if (maxRetryCount < 0 && n > 0) {
try {
logger.error("有{}个任务经异常重试后仍然异常,task为{}", n, SeriUtils.writeObject(solvers));
} catch (Exception e1) {
logger.error(e1.getMessage(), e1);
}
if (failureConsumer != null) {
failureConsumer.accept(solvers);
}
return;
}
//任务处理
CompletionService<T> ecs = new ExecutorCompletionService<>(e);
Map<Future<T>, Callable<T>> callableFutureMap = new HashMap<>(solvers.size());
for (Callable<T> s : solvers) {
Future<T> stringFuture = ecs.submit(s);
callableFutureMap.put(stringFuture, s);
}
//超时任务记录,用于重试
List<Callable<T>> failedSolvers = new ArrayList<>();
for (int i = 0; i < n; i++) {
Future<T> future = null;
try {
future = ecs.take();
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
Callable<T> callable = null;
try {
if (future == null) {
continue;
}
T r = future.get();
if (r != null) {
consumer.accept(r);
}
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
} catch (ExecutionException e2) {
callable = callableFutureMap.get(future);
if (callable != null) {
failedSolvers.add(callable);
}
logger.error(e2.getMessage(), e2);
}
}
//有任务失败时进行重试
if (CollectionUtils.isNotEmpty(failedSolvers)) {
logger.info("有{}个任务异常,开始重试", failedSolvers.size());
solve(e, failedSolvers, consumer, failureConsumer, --maxRetryCount);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment