1) 消息发送publish =>MqClient.publish(topicName, "",new ProducerDataDto(String.valueOf(i)));=>ConsumerController#publish
2)消息存储 saveMessage =>msgNotifyService.notify(request);=>MqQueueExcutorService#notifyMsg
3)消息消费 doPullingData=>pullData =>ConsumerController#consumerService.pullData(request)
1)消息发送中丢失的情况,而此时必然需要保证至少交付1次(at least one)。
2)消息发送采用重试机制(retry),如果重试没有效果,会将失败的队列的相关信息记录到链路和日志中(cat and log),保证交付。
3)同时对重平衡的处理(考虑消费者上下线和队列上下线的情况,也即扩缩容的情况),同时消费的过程中,会对偏移量进行统计,同时会进行心跳上报,证明自己还活着没有下线。
4)而这个偏移量则是消息id。与rocketMQ中不同。
5)消费者在业务方需要做幂等处理,因为消息重启后可能会接收到少量重复的消息。
6)而高可用通过数据库主备来保证。
7)而如果出现生产者、消费者、broker挂掉的情况,在Metric中有详细的情况记录。
8)分区通过表隔离,一个分区对应一个表,不同主题/分区互不干扰。上线下线时,通过Broker向DB插入一条消息,表示需要触发重平衡。一旦broker有重平衡产生,立即通知消费者进行重平衡。消费端收到重平衡后,立即提交偏移并停止消费,连续三次(可配)被分配队列相同时,则开启新的消费。
//进行判空处理,处理完成返回true
if (request == null) {
return true;
}
//进行事务处理、同时进行metric信息收集,执行请求处理,处理完,进行耗时统计,设置事务状态,添加信息到链路中,如果失败,执行失败处理,将失败信息放入链路中
Transaction transaction = Tracer.newTransaction("mq-client-publish", request.getTopicName());
Timer.Context timer1 = MetricSingleton.getMetricRegistry()
.timer("mq.client.publish.time?topic=" + request.getTopicName()).time();
try {
String url = MqConstanst.CONSUMERPRE + "/publish";
long start = System.nanoTime();
PublishMessageResponse response = post(request, url, retryTimes, PublishMessageResponse.class, true);
long end = System.nanoTime();
if (response.getTime() > 0) {
long t = end - start - response.getTime();
t = (t - t % 1000000) / 1000000;
MetricSingleton.getMetricRegistry()
.histogram("mq.client.publish.network.time?topic=" + request.getTopicName()).update(t);
}
transaction.setStatus(Transaction.SUCCESS);
if (!response.isSuc()) {
String json = JsonUtil.toJson(request);
logger.error(response.getMsg());
CatRequest request2 = new CatRequest();
request2.setMethod("publish_fail");
request2.setJson(json);
request2.setMsg(response.getMsg());
addCat(request2);
}
return response.isSuc();
} catch (Exception e) {
MetricSingleton.getMetricRegistry().counter("mq.client.publish.fail.count?topic=" + request.getTopicName())
.inc();
logger.error("publish_error", e);
String json = JsonUtil.toJson(request);
transaction.setStatus(e);
CatRequest request2 = new CatRequest();
request2.setMethod("publish");
request2.setJson(json);
request2.setMsg(e.getMessage());
addCat(request2);
SendMailRequest mailRequest = new SendMailRequest();
mailRequest.setSubject("消息发送失败,客户端:" + request.getClientIp() + ",Topic:" + request.getTopicName());
mailRequest.setContent("消息发送异常," + ",消息体是:" + json + ",异常原因是:" + e.getMessage());
mailRequest.setType(2);
mailRequest.setTopicName(request.getTopicName());
sendMail(mailRequest);
return false;
} finally {
transaction.complete();
timer1.stop();
}
if (response != null) {
if (isImportant) {
failUrlG1.put(host, System.currentTimeMillis() - 10 * 1000);
} else {
failUrlG2.put(host, System.currentTimeMillis() - 10 * 1000);
}
if (response instanceof PublishMessageResponse) {
PublishMessageResponse response2 = ((PublishMessageResponse) response);
if (response2.getSleepTime() > 0) {
response = null;
logger.info(response2.getMsg());
Util.sleep(response2.getSleepTime());
// 这个不算重试,只是降速
count--;
}
} else {
BaseResponse baseResponse = (BaseResponse) response;
if (!baseResponse.isSuc() && baseResponse.getCode() == MqConstanst.NO) {
response = null;
Util.sleep(1000);
} else {
if (!baseResponse.isSuc()) {
logger.error(baseResponse.getMsg());
}
}
}
} else {
// response 等于null 说明接口调用失败了。此时需要将url 放入失败接口中。
if (isImportant) {
failUrlG1.put(host, System.currentTimeMillis());
} else {
failUrlG2.put(host, System.currentTimeMillis());
}
Util.sleep(500);
}
存储的过程就是将消息进行保存的过程,在pmq中就是将数据保存到数据库中
//publish操作
@Override
public PublishMessageResponse publish(PublishMessageRequest request) {
PublishMessageResponse response = new PublishMessageResponse();
checkVaild(request, response);
if (!response.isSuc()) {
return response;
}
try {
if (!checkTopicRate(request, response)) {
return response;
}
//获取所有分配好的topic写队列
Map<String, List<QueueEntity>> queueMap = queueService.getAllLocatedTopicWriteQueue();
//获取所以分配好的主题队列
Map<String, List<QueueEntity>> topicQueueMap = queueService.getAllLocatedTopicQueue();
if (queueMap.containsKey(request.getTopicName()) || topicQueueMap.containsKey(request.getTopicName())) {
List<QueueEntity> queueEntities = queueMap.get(request.getTopicName());
if (queueEntities == null || queueEntities.size() == 0) {
response.setSuc(false);
response.setMsg("topic_" + request.getTopicName() + "_and_has_no_queue!");
//如果主题队列map中包含请求中拿到主题名称,或者配置中拿到发布模式,则执行获取队列信息,同时更新队列缓存
if (topicQueueMap.containsKey(request.getTopicName()) && soaConfig.getPublishMode() == 1) {
queueEntities = topicQueueMap.get(request.getTopicName());
updateQueueCache(request.getTopicName());
} else {
updateQueueCache(request.getTopicName());
return response;
}
}
//如果队列实体列表>0,则执行保存消息操作
if (queueEntities.size() > 0) {
saveMsg(request, response, queueEntities);
}
} else {
response.setSuc(false);
response.setMsg("topic1_" + request.getTopicName() + "_and_has_no_queue!");
return response;
}
} catch (Exception e) {
log.error("publish_error,and request json is " + JsonUtil.toJsonNull(request), e);
response.setSuc(false);
response.setMsg(e.getMessage());
} finally {
//最终,将计数进行自减,同时获取
if (soaConfig.getEnableTopicRate() == 1) {
totalMax.decrementAndGet();
//获取主题
topicPerMax.get(request.getTopicName()).decrementAndGet();
}
}
return response;
}
使用数据库主备db,消息存储同时进行负载均衡,避免单个数据库出现消息队列请求频繁的情况,造成单个库压力过大,同时库可以水平扩展
//执行保存消息
protected void doSaveMsg(List<Message01Entity> message01Entities, PublishMessageRequest request,
PublishMessageResponse response, QueueEntity temp) {
// Transaction transaction = Tracer.newTransaction("PubInner-" +
// temp.getIp(), request.getTopicName());
//消息服务设置数据库id
message01Service.setDbId(temp.getDbNodeId());
Transaction transaction = Tracer.newTransaction("Publish-Data", temp.getIp());
try {
transaction.addData("topic", request.getTopicName());
//执行批量插入
message01Service.insertBatchDy(request.getTopicName(), temp.getTbName(), message01Entities);
// 如果订阅该queue的组,开启了实时消息,则给对应的客户端发送异步通知
if (soaConfig.getMqPushFlag() == 1) {// apollo开关
//通知客户端
notifyClient(temp);
}
dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis() - soaConfig.getDbFailWaitTime() * 2000L);
response.setSuc(true);
transaction.setStatus(Transaction.SUCCESS);
return;
} catch (Exception e) {
// sendPublishFailMail(request, e, 1);
transaction.setStatus(e);
if (e instanceof DataIntegrityViolationException
|| e.getCause() instanceof DataIntegrityViolationException) {
response.setSuc(false);
response.setMsg(e.getMessage());
return;
}
dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis());
// transaction.setStatus(e);
throw new RuntimeException(e);
} finally {
transaction.complete();
}
}
保证消费一次,采用重试机制进行间断性重试。如果重试后没有成功,采用cat进行链路追踪,同时在业务系统进行幂等处理。
//拉取数据,消费消息
@Override
public PullDataResponse pullData(PullDataRequest request) {
PullDataResponse response = new PullDataResponse();
response.setSuc(true);
Map<Long, QueueEntity> data = queueService.getAllQueueMap();
checkVaild(request, response, data);
if (!response.isSuc()) {
return response;
}
//获取消息,通过队列id获取
QueueEntity temp = data.get(request.getQueueId());
Map<Long, DbNodeEntity> dbNodeMap = dbNodeService.getCache();
List<Message01Entity> entities = new ArrayList<>();
Transaction transaction = null;
if (checkFailTime(request.getTopicName(), temp, null) && checkStatus(temp, dbNodeMap)) {
//设置数据库id
message01Service.setDbId(temp.getDbNodeId());
transaction = Tracer.newTransaction("Pull-Data", temp.getIp());
try {
//获取消息,批量
entities = message01Service.getListDy(temp.getTopicName(), temp.getTbName(), request.getOffsetStart(),
request.getOffsetEnd());
transaction.setStatus(Transaction.SUCCESS);
dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis() - soaConfig.getDbFailWaitTime() * 2000L);
} catch (Exception e) {
transaction.setStatus(e);
dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis());
// TODO: handle exception
}
} else {
transaction = Tracer.newTransaction("PullData", "PullData-wait");
transaction.setStatus(Transaction.SUCCESS);
}
transaction.complete();
List<MessageDto> messageDtos = converMessageDto(entities);
response.setMsgs(messageDtos);
return response;
}
进行重平衡的条件:出现消费者consumer的加入和退出、主题topic的增加和退出,通常是进行扩缩容的时候会用到
进行重平衡会做的一个操作:通知重平衡,而过程中会更新重平衡的版本,同时设置通知的消息,同时将通知的消息插入到通知消息服务中,也即会插入到通知消息表中。而在生产者将消息发送存储成功后,也会执行发送通知消费者的动作,也是会做一个通知的操作。例如:
@Override
public void notifyRb(long id) {
updateRbVersion(Arrays.asList(id));
List<NotifyMessageEntity> notifyMessageEntities = new ArrayList<>();
NotifyMessageEntity notifyMessageEntity = new NotifyMessageEntity();
notifyMessageEntity.setConsumerGroupId(id);
notifyMessageEntity.setMessageType(MessageType.Rb);
notifyMessageEntities.add(notifyMessageEntity);
notifyMessageEntity = new NotifyMessageEntity();
notifyMessageEntity.setConsumerGroupId(id);
notifyMessageEntity.setMessageType(MessageType.Meta);
notifyMessageEntities.add(notifyMessageEntity);
notifyMessageService.insertBatch(notifyMessageEntities);
}
审计日志AuditLog和Cat监控=>这个可以从消息发送开始就可以看到直至消息消费
定时清理冗余数据、定时清理历史数据、定时不再存活的消费者通知邮件告警、定时不再进行消息订阅服务通知邮件、定时截断、定时检查、堆积告警(进行邮件发送),对于失败消息进行存储,同时进行告警、心跳检查(采用时间对比的方式,如果超过心跳上报监测时间,则进行数据移除处理,同时offset设置为0)
@Override
public void doStart() {
long minId = notifyMessageService.getMinId();
if (minId > 0 && isMaster()&&soaConfig.isEnbaleNotifyMessageClean()) {
log.info("clear_old_data_minId_is_{}_and_maxId_is_{}", minId, minId + 500);
int count = notifyMessageService.clearOld(soaConfig.getCleanInterval(), minId + 500);
while (count > 0 && isMaster()) {
minId = notifyMessageService.getMinId();
log.info("clear_old_data_minId_is_{}_and_maxId_is_{}", minId, minId + 500);
count = notifyMessageService.clearOld(soaConfig.getCleanInterval(), minId + 500);
Util.sleep(3000);
}
}
}
偏移量提交在启动broker定时任务的时候,就执行了提交偏移量服务的启动。
/**
* 提交偏移量
* @param request
* @return
*/
@Override
public CommitOffsetResponse commitOffset(CommitOffsetRequest request) {
// Transaction catTransaction = Tracer.newTransaction("Timer-service",
// "commitOffset");
CommitOffsetResponse response = new CommitOffsetResponse();
response.setSuc(true);
Map<Long, ConsumerQueueVersionDto> map = mapAppPolling.get();
try {
//如果请求不为空,同时请求的队列偏移量不为空,则执行提交操作
if (request != null && !CollectionUtils.isEmpty(request.getQueueOffsets())) {
request.getQueueOffsets().forEach(t1 -> {
ConsumerQueueVersionDto temp = map.get(t1.getQueueOffsetId());
boolean flag1 = true;
if (temp == null) {
synchronized (lockObj1) {
temp = map.get(t1.getQueueOffsetId());
if (temp == null) {
map.put(t1.getQueueOffsetId(), t1);
flag1 = false;
}
}
}
//如果flag1为true,偏移量版本小,则清理掉老数据,同时将新数据放入,如果当前偏移量版本相等,但偏移量小,则清掉老数据。
if (flag1) {
if (temp.getOffsetVersion() < t1.getOffsetVersion()) {
clearOldData();
map.put(t1.getQueueOffsetId(), t1);
} else if (temp.getOffsetVersion() == t1.getOffsetVersion()
&& temp.getOffset() < t1.getOffset()) {
clearOldData();
map.put(t1.getQueueOffsetId(), t1);
}
}
});
//如果是请求1,则获取版本执行提交偏移量操作
if (request.getFlag() == 1) {
Map<Long, OffsetVersionEntity> offsetVersionMap = queueOffsetService.getOffsetVersion();
request.getQueueOffsets().forEach(t1 -> {
//执行提交偏移量,此时会进行查询,获取标识flag,查询提交偏移量如果>0,则执行后续操作,此时必须满足有偏移的消息和偏移信息
doCommitOffset(t1, 1, offsetVersionMap, 0);
});
}
}
} catch (Exception e) {
}
// catTransaction.setStatus(Transaction.SUCCESS);
// catTransaction.complete();
return response;
}
比如会涉及到偏移量的问题,考虑解决偏移量offset的处理
//执行rb重平衡操作
@Override
// @Transactional(rollbackFor = Exception.class)
public void rb(List<QueueOffsetEntity> queueOffsetEntities) {
Map<Long, String> idsMap = new HashMap<>(30);
List<NotifyMessageEntity> notifyMessageEntities = new ArrayList<>(30);
//将传入的查询偏移量的消费组id放入,并执行更新操作
queueOffsetEntities.forEach(t1 -> {
idsMap.put(t1.getConsumerGroupId(), "");
NotifyMessageEntity notifyMessageEntity = new NotifyMessageEntity();
notifyMessageEntity.setConsumerGroupId(t1.getConsumerGroupId());
notifyMessageEntity.setMessageType(MessageType.Meta);
notifyMessageEntities.add(notifyMessageEntity);
// 更新consumerid 和consumername
queueOffsetService.updateConsumerId(t1);
});
// 更新重平衡版本,注意这个代码非常的重要,这个可以保证客户端能够拿到最新的重平衡版本号
updateRbVersion(new ArrayList<>(idsMap.keySet()));
// 批量插入消息事件
notifyMessageService.insertBatch(notifyMessageEntities);
}
一个消息中间件少不了的是消息的发送、存储、消费,而在pmq发送过程中需要考虑消息的发送失败情况,此时需要使用重试+链路,如果还是不行,将失败消息放入数据库中,此时将发送失败的消息存入到数据中,以便重新发送。消息发送成功,进行消息存储。而存储的过程中,需要考虑数据库的挂了的情况,因此需要采用数据库主备。而采用通知消费者消息的方式,通知消费者可以拉取消息进行消费了。此时消费者进行消息的消费,此时需要考虑消息偏移量,而这个偏移量则是消息id,与rocketmq中不同,同时在消费的过程中,不排除重平衡的情况产生,也即消费者的上下线、队列上下线的情况,因此此时进行重平衡。而消费的过程中,也会产生消息丢失的情况,因此需要采用重试+链路监控的方式。同时对于消费者,需要进行消息至少一次,因此如果需要一次,在业务消费方中需要进行幂等处理。