前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >生产-消费模型组件实现 Producer Consumer Solution using BlockingQueue

生产-消费模型组件实现 Producer Consumer Solution using BlockingQueue

原创
作者头像
LiosWong
修改2021-06-11 14:28:01
4580
修改2021-06-11 14:28:01
举报
文章被收录于专栏:后端沉思录

原来各个应用某些业务节点的日志通过filebeat+elk收集,供各个业务方使用,线上发现filebeat消耗非常大的硬件资源,而且应用集群较大,每次新增日志类型时需要新增filebeat配置并启动新的应用进程,较为繁琐和耗服务器资源;

优化成通过kafka收集日志,consumer消费并写入es,考虑到频繁的写入es会造成应用资源、网络开销的浪费,设计上为批量写入es。该应用场景比较通用,考虑把该生产-消费模型

抽成组件放入公共模块。

组件设计细节
  • 使用方自定义业务属性;
  • 使用方自定义写入阈值
  • 业务方实现具体消费接口,建议使用线程池消费
  • 单线程轮询阻塞队列,线程池执行消费,速度很快
  • 程序退出最大保证任务不丢失(钩子函数)

本着易用、可扩展性原则,使用方不关心内部实现,只需要关心消费策略和业务实现即可。该组件实现具体功能可以概括为:任务不断的被放入阻塞队列中,线程池会不断轮训队列,当任务数达到阈值时,会批量实现具体业务需求,并在程序退出时,最大化保证任务不丢失。

代码实现

阻塞队列集合:

代码语言:txt
复制
public interface BlockQueue {

    /**
     * taskName : BlockingQueue
     */
    Map<String, LinkedBlockingQueue<QueueTask>> BLOCK_MSG_QUEUE_TASK_MAP = Maps.newConcurrentMap();

}

任务QueueTask:

代码语言:txt
复制
public interface QueueTask{
}

消费接口,使用方实现:

代码语言:txt
复制
public interface BlockQueueConsumer<R> {
    void accept(R queueTasks);
}

任务添加、处理类:

代码语言:txt
复制
@Slf4j
public class QueueTaskExecutor {

    private static final AtomicBoolean SHUTDOWN_HOOK_FLAG = new AtomicBoolean(true);

    private static final ExecutorService QUEUE_TASK_EXECUTOR = Executors.newSingleThreadExecutor();

    /**
     * max task count
     */
    private Integer maxElements;

    /**
     * consumer interface
     */
    private BlockQueueConsumer blockQueueConsumer;

    /**
     * task unique name
     */
    private String taskName;

    @PostConstruct
    private void init() {
        // create block queue
        BlockQueue.BLOCK_MSG_QUEUE_TASK_MAP.putIfAbsent(taskName, new LinkedBlockingQueue<QueueTask>());
        // shutdown hook
        addShutdownHook(QUEUE_TASK_EXECUTOR);
        QUEUE_TASK_EXECUTOR.execute(() -> {
            while (true) {
                try {
                    if (SHUTDOWN_HOOK_FLAG.get()) {
                        if (BlockQueue.BLOCK_MSG_QUEUE_TASK_MAP.get(taskName).size() < maxElements) {
                            Thread.sleep(100);
                            continue;
                        }
                    } else { // task exit
                        if (BlockQueue.BLOCK_MSG_QUEUE_TASK_MAP.get(taskName).size() <= 0) {
                            log.warn("QueueExecutor shutdown executing,but the BLOCK_MSG_QUEUE empty. The task exit now.");
                            Thread.sleep(100);
                            // double check
                            if (BlockQueue.BLOCK_MSG_QUEUE_TASK_MAP.get(taskName).size() <= 0) {
                                return;
                            }
                        }
                    }
                    List<QueueTask> queueTasks = Lists.newArrayList();
                    // get tasks in batch
                    BlockQueue.BLOCK_MSG_QUEUE_TASK_MAP.get(taskName).drainTo(queueTasks, maxElements);
                    if (CollectionUtils.isEmpty(queueTasks)) {
                        log.warn("QueueExecutor BLOCK_MSG_QUEUE empty.");
                        continue;
                    }
                    blockQueueConsumer.accept(queueTasks);
                } catch (Exception e) {
                    log.warn("QueueExecutor execute error.", e);
                }
            }
        });
    }

    /**
     * add task to Queue
     *
     * @param taskName unique task name
     * @param t        element
     * @param <T>
     */
    public static <T extends QueueTask> void add2Queue(String taskName, T t) {
        LinkedBlockingQueue<QueueTask> blockingQueue = BlockQueue.BLOCK_MSG_QUEUE_TASK_MAP.get(taskName);
        Assert.notNull(blockingQueue, "QueueTaskExecutor blockingQueue taskName not exist!");
        blockingQueue.add(t);
    }

    public static void addShutdownHook(ExecutorService executorService) {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                SHUTDOWN_HOOK_FLAG.set(false);
                Shutdown.shutdown(executorService, 100);
            } catch (Exception e) {
                log.error("QueueTaskExecutor shutdown fail.", e);
            }
        }));
    }

    public QueueTaskExecutor setMaxElements(Integer maxElements) {
        this.maxElements = maxElements;
        return this;
    }

    public QueueTaskExecutor setBlockQueueConsumer(BlockQueueConsumer blockQueueConsumer) {
        this.blockQueueConsumer = blockQueueConsumer;
        return this;
    }

    public QueueTaskExecutor setTaskName(String taskName) {
        this.taskName = taskName;
        return this;
    }

}

具体代码实现也不复杂,使用无界阻塞队列LinkedBlockingQueue,保证高并发下放任务、批量获取任务的线程安全,具体不作多述。

使用
代码语言:txt
复制
@Configuration
public class QueueTaskConfig {

    @Value("${traceMsgBulkSize}")
    private Integer traceMsgBulkSize;

    @Bean
    public QueueTaskExecutor queueTaskExecutor() {
        return new QueueTaskExecutor()
                .setBlockQueueConsumer(blockQueueConsumer())
                .setMaxElements(traceMsgBulkSize)
                .setTaskName(XXXX_QUEUE_TASK_NAME);
    }

    @Bean(name = "kafkaMsgBlockQueueConsumer")
    public BlockQueueConsumer blockQueueConsumer() {
        return new KafkaMsgBlockQueueConsumer();
    }
}

BlockQueueConsumer实现类:

代码语言:txt
复制
public class XXXXBlockQueueConsumer implements BlockQueueConsumer<List<EsMsgDto>> {

    private Client client;

    public XXXXBlockQueueConsumer(Client client) {
        this.client = client;
    }


    @Override
    public void accept(List<EsMsgDto> task) {
        // 线程池处理消费
        ES_EXECUTOR.execute(() -> {
            writer2Es(task.getT());
        });
    }

    private void writer2Es(List<EsMsgDto> queueTasks) {
        // 构建BulkRequest
        BulkRequest bulkRequest = new BulkRequest();
        Option.of(queueTasks)
                .getOrElse(ArrayList::new)
                .stream()
                .map(EsCommand::buildIndexRequest)
                .collect(Collectors.toList())
                .forEach(bulkRequest::add);
        // 批量写入es
        EsCommand.addBulk(client, bulkRequest, new ActionListener<BulkResponse>() {
            @Override
            public void onResponse(BulkResponse bulkItemResponses) {

            }

            @Override
            public void onFailure(Exception e) {
                log.error("XXXXBlockQueueConsumer bulk onFailure.", e);
            }
        });
    }
}

业务上使用比较简单,也较为灵活,支持多业务的生产消费,使用方可指定具体的消费阈值,实现具体的业务需求。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 组件设计细节
  • 代码实现
  • 使用
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档