首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Java 异步任务工厂核心架构设计与基础实现

Java 异步任务工厂核心架构设计与基础实现

原创
作者头像
小焱
发布2025-09-21 11:35:51
发布2025-09-21 11:35:51
70
举报
文章被收录于专栏:Java开发Java开发
代码语言:java
复制
import java.util.concurrent.*;

/**
 * 基于线程池的任务执行器实现
 */
public class ThreadPoolTaskExecutor implements TaskExecutor {
    private final ExecutorService executorService;
    private final ExceptionHandler exceptionHandler;
    
    /**
     * 构造函数,使用默认的线程池配置和异常处理器
     */
    public ThreadPoolTaskExecutor() {
        this(Runtime.getRuntime().availableProcessors() * 2, 
             new LinkedBlockingQueue<>(),
             new DefaultExceptionHandler());
    }
    
    /**
     * 构造函数,允许自定义线程池配置和异常处理器
     * @param corePoolSize 核心线程数
     * @param workQueue 任务队列
     * @param exceptionHandler 异常处理器
     */
    public ThreadPoolTaskExecutor(int corePoolSize, BlockingQueue<Runnable> workQueue, 
                                 ExceptionHandler exceptionHandler) {
        this.executorService = new ThreadPoolExecutor(
            corePoolSize,
            corePoolSize * 2,
            60L, TimeUnit.SECONDS,
            workQueue,
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
        this.exceptionHandler = exceptionHandler;
    }
    
    @Override
    public <T> Future<T> submit(Task<T> task, ResultHandler<T> resultHandler) {
        // 提交任务到线程池
        Future<T> future = executorService.submit(() -> task.execute());
        
        // 使用另一个线程监听任务完成情况
        executorService.submit(() -> {
            try {
                // 等待任务完成并获取结果
                T result = future.get();
                // 处理任务结果
                if (resultHandler != null) {
                    resultHandler.handleResult(task, result);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                exceptionHandler.handleException(task, e);
            } catch (ExecutionException e) {
                exceptionHandler.handleException(task, e.getCause());
            }
        });
        
        return future;
    }
    
    @Override
    public void shutdown() {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
        }
    }
}
代码语言:java
复制
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;

/**
 * 任务工厂,负责创建、管理和跟踪异步任务
 */
public class TaskFactory {
    private final TaskExecutor taskExecutor;
    private final ConcurrentMap<String, Future<?>> taskFutures = new ConcurrentHashMap<>();
    
    /**
     * 构造函数
     * @param taskExecutor 任务执行器
     */
    public TaskFactory(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }
    
    /**
     * 创建并提交一个任务
     * @param task 任务实例
     * @param resultHandler 结果处理器
     * @param <T> 任务结果类型
     * @return 任务ID
     */
    public <T> String createAndSubmitTask(Task<T> task, ResultHandler<T> resultHandler) {
        String taskId = task.getTaskId();
        Future<T> future = taskExecutor.submit(task, resultHandler);
        taskFutures.put(taskId, future);
        
        // 任务完成后移除跟踪
        cleanupFutureOnCompletion(taskId, future);
        
        return taskId;
    }
    
    /**
     * 创建一个默认ID的任务并提交
     * @param task 任务实例(忽略其getTaskId()方法)
     * @param resultHandler 结果处理器
     * @param <T> 任务结果类型
     * @return 自动生成的任务ID
     */
    public <T> String createAndSubmitTask(Task<T> task, ResultHandler<T> resultHandler, boolean useGeneratedId) {
        if (!useGeneratedId) {
            return createAndSubmitTask(task, resultHandler);
        }
        
        // 生成唯一任务ID
        String taskId = generateTaskId();
        
        // 创建包装任务,使用生成的ID
        Task<T> wrappedTask = new Task<T>() {
            @Override
            public T execute() throws Exception {
                return task.execute();
            }
            
            @Override
            public String getTaskId() {
                return taskId;
            }
        };
        
        return createAndSubmitTask(wrappedTask, resultHandler);
    }
    
    /**
     * 生成唯一任务ID
     * @return 唯一任务ID
     */
    private String generateTaskId() {
        return "task-" + UUID.randomUUID().toString();
    }
    
    /**
     * 检查任务是否完成
     * @param taskId 任务ID
     * @return 如果任务已完成则返回true,否则返回false
     */
    public boolean isTaskCompleted(String taskId) {
        Future<?> future = taskFutures.get(taskId);
        return future == null || future.isDone();
    }
    
    /**
     * 取消任务执行
     * @param taskId 任务ID
     * @param mayInterruptIfRunning 是否允许中断正在执行的任务
     * @return 如果任务成功取消则返回true,否则返回false
     */
    public boolean cancelTask(String taskId, boolean mayInterruptIfRunning) {
        Future<?> future = taskFutures.get(taskId);
        if (future != null && !future.isDone()) {
            boolean canceled = future.cancel(mayInterruptIfRunning);
            if (canceled) {
                taskFutures.remove(taskId);
            }
            return canceled;
        }
        return false;
    }
    
    /**
     * 任务完成后清理跟踪
     */
    private <T> void cleanupFutureOnCompletion(String taskId, Future<T> future) {
        // 使用执行器提交一个清理任务
        ((ThreadPoolTaskExecutor) taskExecutor).getExecutorService().submit(() -> {
            try {
                // 等待任务完成
                future.get();
            } catch (InterruptedException | ExecutionException e) {
                // 忽略异常,任务已经被处理
            } finally {
                // 无论任务成功还是失败,都移除跟踪
                taskFutures.remove(taskId);
            }
        });
    }
    
    /**
     * 关闭任务工厂
     */
    public void shutdown() {
        taskExecutor.shutdown();
        taskFutures.clear();
    }
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档