前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊RocketMQCanalConnector的getFlatList

聊聊RocketMQCanalConnector的getFlatList

原创
作者头像
code4it
修改2020-04-09 09:57:53
5620
修改2020-04-09 09:57:53
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下RocketMQCanalConnector的getFlatList

getFlatList

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

代码语言:javascript
复制
public class RocketMQCanalConnector implements CanalMQConnector {
​
    private static final Logger                 logger               = LoggerFactory.getLogger(RocketMQCanalConnector.class);
    private static final String                 CLOUD_ACCESS_CHANNEL = "cloud";
​
    private String                              nameServer;
    private String                              topic;
    private String                              groupName;
    private volatile boolean                    connected           = false;
    private DefaultMQPushConsumer               rocketMQConsumer;
    private BlockingQueue<ConsumerBatchMessage> messageBlockingQueue;
    private int                                 batchSize           = -1;
    private long                                batchProcessTimeout = 60 * 1000;
    private boolean                             flatMessage;
    private volatile ConsumerBatchMessage       lastGetBatchMessage = null;
    private String                              accessKey;
    private String                              secretKey;
    private String                              customizedTraceTopic;
    private boolean                             enableMessageTrace = false;
    private String                              accessChannel;
    private String                              namespace;
​
    //......
​
    public List<FlatMessage> getFlatList(Long timeout, TimeUnit unit) throws CanalClientException {
        List<FlatMessage> messages = getFlatListWithoutAck(timeout, unit);
        if (messages != null && !messages.isEmpty()) {
            ack();
        }
        return messages;
    }
​
    public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) throws CanalClientException {
        try {
            if (this.lastGetBatchMessage != null) {
                throw new CanalClientException("mq get/ack not support concurrent & async ack");
            }
​
            ConsumerBatchMessage batchMessage = messageBlockingQueue.poll(timeout, unit);
            if (batchMessage != null) {
                this.lastGetBatchMessage = batchMessage;
                return batchMessage.getData();
            }
        } catch (InterruptedException ex) {
            logger.warn("Get message timeout", ex);
            throw new CanalClientException("Failed to fetch the data after: " + timeout);
        }
        return Lists.newArrayList();
    }
​
    public void ack() throws CanalClientException {
        try {
            if (this.lastGetBatchMessage != null) {
                this.lastGetBatchMessage.ack();
            }
        } catch (Throwable e) {
            if (this.lastGetBatchMessage != null) {
                this.lastGetBatchMessage.fail();
            }
        } finally {
            this.lastGetBatchMessage = null;
        }
    }
​
    //......
}        
  • RocketMQCanalConnector的getFlatList方法通过getFlatListWithoutAck获取FlatMessage列表,然后在messages不为空时执行ack;getFlatListWithoutAck方法从messageBlockingQueue拉取batchMessage,若不为null则更新lastGetBatchMessage,返回batchMessage.getData();ack则执行lastGetBatchMessage.ack(),若出现异常则执行lastGetBatchMessage.fail()

subscribe

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

代码语言:javascript
复制
public class RocketMQCanalConnector implements CanalMQConnector {
    
    //......
​
    public synchronized void subscribe(String filter) throws CanalClientException {
        if (connected) {
            return;
        }
        try {
            if (rocketMQConsumer == null) {
                this.connect();
            }
            rocketMQConsumer.subscribe(this.topic, "*");
            rocketMQConsumer.registerMessageListener(new MessageListenerOrderly() {
​
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeOrderlyContext context) {
                    context.setAutoCommit(true);
                    boolean isSuccess = process(messageExts);
                    if (isSuccess) {
                        return ConsumeOrderlyStatus.SUCCESS;
                    } else {
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                }
            });
            rocketMQConsumer.start();
        } catch (MQClientException ex) {
            connected = false;
            logger.error("Start RocketMQ consumer error", ex);
        }
        connected = true;
    }
​
    //......
​
}
  • subscribe方法给rocketMQConsumer注册了MessageListenerOrderly,其consumeMessage方法执行process方法

process

canal-1.1.4/client/src/main/java/com/alibaba/otter/canal/client/rocketmq/RocketMQCanalConnector.java

代码语言:javascript
复制
public class RocketMQCanalConnector implements CanalMQConnector {
    
    //......
​
    private boolean process(List<MessageExt> messageExts) {
        if (logger.isDebugEnabled()) {
            logger.debug("Get Message: {}", messageExts);
        }
        List messageList = Lists.newArrayList();
        for (MessageExt messageExt : messageExts) {
            byte[] data = messageExt.getBody();
            if (data != null) {
                try {
                    if (!flatMessage) {
                        Message message = CanalMessageDeserializer.deserializer(data);
                        messageList.add(message);
                    } else {
                        FlatMessage flatMessage = JSON.parseObject(data, FlatMessage.class);
                        messageList.add(flatMessage);
                    }
                } catch (Exception ex) {
                    logger.error("Add message error", ex);
                    throw new CanalClientException(ex);
                }
            } else {
                logger.warn("Received message data is null");
            }
        }
        ConsumerBatchMessage batchMessage;
        if (!flatMessage) {
            batchMessage = new ConsumerBatchMessage<Message>(messageList);
        } else {
            batchMessage = new ConsumerBatchMessage<FlatMessage>(messageList);
        }
        try {
            messageBlockingQueue.put(batchMessage);
        } catch (InterruptedException e) {
            logger.error("Put message to queue error", e);
            throw new RuntimeException(e);
        }
        boolean isCompleted;
        try {
            isCompleted = batchMessage.waitFinish(batchProcessTimeout);
        } catch (InterruptedException e) {
            logger.error("Interrupted when waiting messages to be finished.", e);
            throw new RuntimeException(e);
        }
        boolean isSuccess = batchMessage.isSuccess();
        return isCompleted && isSuccess;
    }
​
    //......
​
}
  • process方法会将MessageExt转换为Message或者FlatMessage,然后组装成ConsumerBatchMessage放到messageBlockingQueue中

小结

RocketMQCanalConnector的getFlatList方法通过getFlatListWithoutAck获取FlatMessage列表,然后在messages不为空时执行ack;getFlatListWithoutAck方法从messageBlockingQueue拉取batchMessage,若不为null则更新lastGetBatchMessage,返回batchMessage.getData();ack则执行lastGetBatchMessage.ack(),若出现异常则执行lastGetBatchMessage.fail()

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • getFlatList
  • subscribe
  • process
  • 小结
  • doc
相关产品与服务
批量计算
批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档