import java.util.concurrent.*;
/**
* 基于线程池的任务执行器实现
*/
public class ThreadPoolTaskExecutor implements TaskExecutor {
private final ExecutorService executorService;
private final ExceptionHandler exceptionHandler;
/**
* 构造函数,使用默认的线程池配置和异常处理器
*/
public ThreadPoolTaskExecutor() {
this(Runtime.getRuntime().availableProcessors() * 2,
new LinkedBlockingQueue<>(),
new DefaultExceptionHandler());
}
/**
* 构造函数,允许自定义线程池配置和异常处理器
* @param corePoolSize 核心线程数
* @param workQueue 任务队列
* @param exceptionHandler 异常处理器
*/
public ThreadPoolTaskExecutor(int corePoolSize, BlockingQueue<Runnable> workQueue,
ExceptionHandler exceptionHandler) {
this.executorService = new ThreadPoolExecutor(
corePoolSize,
corePoolSize * 2,
60L, TimeUnit.SECONDS,
workQueue,
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
this.exceptionHandler = exceptionHandler;
}
@Override
public <T> Future<T> submit(Task<T> task, ResultHandler<T> resultHandler) {
// 提交任务到线程池
Future<T> future = executorService.submit(() -> task.execute());
// 使用另一个线程监听任务完成情况
executorService.submit(() -> {
try {
// 等待任务完成并获取结果
T result = future.get();
// 处理任务结果
if (resultHandler != null) {
resultHandler.handleResult(task, result);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
exceptionHandler.handleException(task, e);
} catch (ExecutionException e) {
exceptionHandler.handleException(task, e.getCause());
}
});
return future;
}
@Override
public void shutdown() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}
}
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
/**
* 任务工厂,负责创建、管理和跟踪异步任务
*/
public class TaskFactory {
private final TaskExecutor taskExecutor;
private final ConcurrentMap<String, Future<?>> taskFutures = new ConcurrentHashMap<>();
/**
* 构造函数
* @param taskExecutor 任务执行器
*/
public TaskFactory(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
/**
* 创建并提交一个任务
* @param task 任务实例
* @param resultHandler 结果处理器
* @param <T> 任务结果类型
* @return 任务ID
*/
public <T> String createAndSubmitTask(Task<T> task, ResultHandler<T> resultHandler) {
String taskId = task.getTaskId();
Future<T> future = taskExecutor.submit(task, resultHandler);
taskFutures.put(taskId, future);
// 任务完成后移除跟踪
cleanupFutureOnCompletion(taskId, future);
return taskId;
}
/**
* 创建一个默认ID的任务并提交
* @param task 任务实例(忽略其getTaskId()方法)
* @param resultHandler 结果处理器
* @param <T> 任务结果类型
* @return 自动生成的任务ID
*/
public <T> String createAndSubmitTask(Task<T> task, ResultHandler<T> resultHandler, boolean useGeneratedId) {
if (!useGeneratedId) {
return createAndSubmitTask(task, resultHandler);
}
// 生成唯一任务ID
String taskId = generateTaskId();
// 创建包装任务,使用生成的ID
Task<T> wrappedTask = new Task<T>() {
@Override
public T execute() throws Exception {
return task.execute();
}
@Override
public String getTaskId() {
return taskId;
}
};
return createAndSubmitTask(wrappedTask, resultHandler);
}
/**
* 生成唯一任务ID
* @return 唯一任务ID
*/
private String generateTaskId() {
return "task-" + UUID.randomUUID().toString();
}
/**
* 检查任务是否完成
* @param taskId 任务ID
* @return 如果任务已完成则返回true,否则返回false
*/
public boolean isTaskCompleted(String taskId) {
Future<?> future = taskFutures.get(taskId);
return future == null || future.isDone();
}
/**
* 取消任务执行
* @param taskId 任务ID
* @param mayInterruptIfRunning 是否允许中断正在执行的任务
* @return 如果任务成功取消则返回true,否则返回false
*/
public boolean cancelTask(String taskId, boolean mayInterruptIfRunning) {
Future<?> future = taskFutures.get(taskId);
if (future != null && !future.isDone()) {
boolean canceled = future.cancel(mayInterruptIfRunning);
if (canceled) {
taskFutures.remove(taskId);
}
return canceled;
}
return false;
}
/**
* 任务完成后清理跟踪
*/
private <T> void cleanupFutureOnCompletion(String taskId, Future<T> future) {
// 使用执行器提交一个清理任务
((ThreadPoolTaskExecutor) taskExecutor).getExecutorService().submit(() -> {
try {
// 等待任务完成
future.get();
} catch (InterruptedException | ExecutionException e) {
// 忽略异常,任务已经被处理
} finally {
// 无论任务成功还是失败,都移除跟踪
taskFutures.remove(taskId);
}
});
}
/**
* 关闭任务工厂
*/
public void shutdown() {
taskExecutor.shutdown();
taskFutures.clear();
}
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。