RocketMQ为开发者提供了两种消息的消费模式,分别是Pull和Push,对应的实现是DefaultMQPullConsumer和DefaultMQPushConsumer; 接下来我将带大家通过以下几个方面了解这两种模式:
Tip
:我本人在多年的开发经验中常用的MQ中间件如Kafka、RocketMQ都实战使用的,简单的使用示例可以参考我Git。
// Git代码
https://gitee.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
https://github.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
这种模式很容易理解,就是消费者主动请求Broker去拉取一批消息,然后消费; 这种模式的好处是可以根据客户端消费能力来主动获取消息量;但是弊端也比较明显,就是获取消息的时机不太好把握 ,获取时间间隔小容易造成CPU浪费,时间间隔太大又会造成消费不及时。
使用提供的DefaultMQPullConsumer这个实现,调用fetchMessageQueuesInBalance拿到该Topic下的Queue,然后调用pull()方法从Queue中指定offset获取消息
public class PullConsumer {
public static void main(String[] args) throws MQClientException {
// 创建Pull模式消费实例
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("test_consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
// 获取该Topic下的所有Queue
Set<MessageQueue> messageQueues = consumer.fetchMessageQueuesInBalance("TopicTest");
PullResult pullResult = null;
// 从Queue中获取消息
for (MessageQueue messageQueue : messageQueues) {
long offset = this.consumeFromOffset(messageQueue);
pullResult = consumer.pull(messageQueue, "*", offset, 32);
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> msgs = pullResult.getMsgFoundList();
// 执行自定义的消费逻辑
this.doSomething(msgs);
//update offset to broker
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
break;
case OFFSET_ILLEGAL:
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
break;
case NO_NEW_MSG:
Thread.sleep(1);
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
break;
case NO_MATCHED_MSG:
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
break;
default:
}
}
}
}
/*
封装请求报文RemotingCommand.createRequestCommand(RequestCode.**PULL_MESSAGE**)
*/
public class MQClientAPIImpl implements NameServerUpdateCallback {
public PullResult pullMessage(final String addr, final PullMessageRequestHeader requestHeader, ..., final PullCallback pullCallback) {
...
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
...
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
...
}
}
/*
RemotingClient调用channel.writeAndFlush(request)发出拉取请求
*/
public abstract class NettyRemotingAbstract {
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) {
...
try {
...
channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
...
});
...
}
...
}
}
这个模式解决了Pull模式请求时间间隔的痛点,从直观上看来就是Broker主动推送消息,这样消息消费也比较及时。
用api提供的DefaultMQPushConsumer这个实现,首先订阅Topic及注册监听方法,然后调用start方法就可以接收消息了。
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 创建Push模式消费实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.subscribe(TOPIC, "*");
// 注册监听方法处理消息逻辑
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 开启
consumer.start();
}
}
/*
开启拉取消息任务
*/
public class MQClientInstance {
public void start() throws MQClientException {
...
// Start pull service
this.pullMessageService.start();
...
}
}
/*
轮询执行拉取消息请求
*/
public class PullMessageService extends ServiceThread {
@Override
public void run() {
while (!this.isStopped()) {
try {
MessageRequest messageRequest = this.messageRequestQueue.take();
// 执行DefaultMQPushConsumerImpl.pullMessage拉取
this.pullMessage((PullRequest) messageRequest);
} catch (Exception e) {
logger.error("Pull Message Service Run Method exception", e);
}
}
}
}
/*
同Pull模式QClientAPIImpl().pullMessage基础上进一步封装了Pull逻辑;
在命中某些条件下执行executePullRequestLater方法延迟请求拉取,避免短时间内大量无效请求
*/
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void pullMessage(final PullRequest pullRequest) {
// 通过判断各种条件下是否执行延迟处理,避免短时间内大量无效请求
if (...) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
...
// 拉取回调逻辑,执行之前注册的registerMessageListener监听
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
...
}
@Override
public void onException(Throwable e) {
...
}
};
...
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
...
...
pullCallback
);
} catch (Exception e) {
// 延迟请求
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
}
/*
没有消息则挂起此次请求
*/
public class DefaultPullMessageResultHandler implements PullMessageResultHandler {
@Override
public RemotingCommand handle(final GetMessageResult getMessageResult,
final RemotingCommand request,
...
final Channel channel,
...
RemotingCommand response) {
switch (response.getCode()) {
switch (response.getCode()) {
case ResponseCode.SUCCESS:
...
// 有消息则写回Channel
case ResponseCode.PULL_NOT_FOUND: // 没有消息
...
// 挂起请求
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
return null;
...
}
}
}
}
/*
CommitLog有新消息主动通知Consumer来拉取消息
*/
public class NotifyMessageArrivingListener implements MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
// 通知挂起的客户端来拉取消息
this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
msgStoreTime, filterBitMap, properties);
...
}
}
目前为止通过源码跟踪我们可以发现,RocketMq的Push模式的实现和我们通常了解的实现上有一定的差异,它是由由Consumer主要来发起拉取请求去Broker拉取, 但是Rocketmq通过对拉取逻辑的一系列封装,以及采用长轮询机制让Consumer请求挂起避免短轮询无效请求,同时Broker在消息产生时也会及时通知挂起的Consumer来拉取消息,最终达到了Push的效果。
Tip
:我本人在多年的开发经验中常用的MQ中间件如Kafka、RocketMQ都实战使用的,简单的使用示例可以参考我Git。
// Git代码
https://gitee.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
https://github.com/yeeevip/yeee-memo/tree/master/middle-ware/mq
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。