前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >pmq学习总结

pmq学习总结

作者头像
路行的亚洲
发布2021-03-04 15:47:50
6130
发布2021-03-04 15:47:50
举报
文章被收录于专栏:后端技术学习

pmq架构图

pmq中消息端到端流程

1.消息的核心是什么?

代码语言:javascript
复制
1) 消息发送publish =>MqClient.publish(topicName, "",new ProducerDataDto(String.valueOf(i)));=>ConsumerController#publish
2)消息存储 saveMessage =>msgNotifyService.notify(request);=>MqQueueExcutorService#notifyMsg
3)消息消费 doPullingData=>pullData =>ConsumerController#consumerService.pullData(request)    

2.消息发送的过程中需要考虑什么?

代码语言:javascript
复制
1)消息发送中丢失的情况,而此时必然需要保证至少交付1次(at least one)。
2)消息发送采用重试机制(retry),如果重试没有效果,会将失败的队列的相关信息记录到链路和日志中(cat and log),保证交付。
3)同时对重平衡的处理(考虑消费者上下线和队列上下线的情况,也即扩缩容的情况),同时消费的过程中,会对偏移量进行统计,同时会进行心跳上报,证明自己还活着没有下线。
4)而这个偏移量则是消息id。与rocketMQ中不同。
5)消费者在业务方需要做幂等处理,因为消息重启后可能会接收到少量重复的消息。
6)而高可用通过数据库主备来保证。
7)而如果出现生产者、消费者、broker挂掉的情况,在Metric中有详细的情况记录。
8)分区通过表隔离,一个分区对应一个表,不同主题/分区互不干扰。上线下线时,通过Broker向DB插入一条消息,表示需要触发重平衡。一旦broker有重平衡产生,立即通知消费者进行重平衡。消费端收到重平衡后,立即提交偏移并停止消费,连续三次(可配)被分配队列相同时,则开启新的消费。  

进行发送消息处理

代码语言:javascript
复制
//进行判空处理,处理完成返回true
if (request == null) {
    return true;
}
//进行事务处理、同时进行metric信息收集,执行请求处理,处理完,进行耗时统计,设置事务状态,添加信息到链路中,如果失败,执行失败处理,将失败信息放入链路中
Transaction transaction = Tracer.newTransaction("mq-client-publish", request.getTopicName());
Timer.Context timer1 = MetricSingleton.getMetricRegistry()
    .timer("mq.client.publish.time?topic=" + request.getTopicName()).time();
try {
    String url = MqConstanst.CONSUMERPRE + "/publish";
    long start = System.nanoTime();
    PublishMessageResponse response = post(request, url, retryTimes, PublishMessageResponse.class, true);
    long end = System.nanoTime();
    if (response.getTime() > 0) {
        long t = end - start - response.getTime();
        t = (t - t % 1000000) / 1000000;
        MetricSingleton.getMetricRegistry()
            .histogram("mq.client.publish.network.time?topic=" + request.getTopicName()).update(t);
    }
    transaction.setStatus(Transaction.SUCCESS);
    if (!response.isSuc()) {
        String json = JsonUtil.toJson(request);
        logger.error(response.getMsg());
        CatRequest request2 = new CatRequest();
        request2.setMethod("publish_fail");
        request2.setJson(json);
        request2.setMsg(response.getMsg());
        addCat(request2);
    }
    return response.isSuc();
} catch (Exception e) {
    MetricSingleton.getMetricRegistry().counter("mq.client.publish.fail.count?topic=" + request.getTopicName())
        .inc();
    logger.error("publish_error", e);
    String json = JsonUtil.toJson(request);
    transaction.setStatus(e);
    CatRequest request2 = new CatRequest();
    request2.setMethod("publish");
    request2.setJson(json);
    request2.setMsg(e.getMessage());
    addCat(request2);

    SendMailRequest mailRequest = new SendMailRequest();
    mailRequest.setSubject("消息发送失败,客户端:" + request.getClientIp() + ",Topic:" + request.getTopicName());
    mailRequest.setContent("消息发送异常," + ",消息体是:" + json + ",异常原因是:" + e.getMessage());
    mailRequest.setType(2);
    mailRequest.setTopicName(request.getTopicName());
    sendMail(mailRequest);
    return false;
} finally {
    transaction.complete();
    timer1.stop();
}

发送请求的过程中,如果失败还会进行降速处理

代码语言:javascript
复制
if (response != null) {
   if (isImportant) {
      failUrlG1.put(host, System.currentTimeMillis() - 10 * 1000);
   } else {
      failUrlG2.put(host, System.currentTimeMillis() - 10 * 1000);
   }
   if (response instanceof PublishMessageResponse) {
      PublishMessageResponse response2 = ((PublishMessageResponse) response);
      if (response2.getSleepTime() > 0) {
         response = null;
         logger.info(response2.getMsg());
         Util.sleep(response2.getSleepTime());
         // 这个不算重试,只是降速
         count--;
      }
   } else {
      BaseResponse baseResponse = (BaseResponse) response;
      if (!baseResponse.isSuc() && baseResponse.getCode() == MqConstanst.NO) {
         response = null;
         Util.sleep(1000);
      } else {
         if (!baseResponse.isSuc()) {
            logger.error(baseResponse.getMsg());
         }
      }
   }
} else {
   // response 等于null 说明接口调用失败了。此时需要将url 放入失败接口中。
   if (isImportant) {
      failUrlG1.put(host, System.currentTimeMillis());
   } else {
      failUrlG2.put(host, System.currentTimeMillis());
   }
   Util.sleep(500);
}

保存消息到数据库

存储的过程就是将消息进行保存的过程,在pmq中就是将数据保存到数据库中

代码语言:javascript
复制
//publish操作
@Override
public PublishMessageResponse publish(PublishMessageRequest request) {
    PublishMessageResponse response = new PublishMessageResponse();
    checkVaild(request, response);
    if (!response.isSuc()) {
        return response;
    }
    try {
        if (!checkTopicRate(request, response)) {
            return response;
        }
        //获取所有分配好的topic写队列
        Map<String, List<QueueEntity>> queueMap = queueService.getAllLocatedTopicWriteQueue();
        //获取所以分配好的主题队列
        Map<String, List<QueueEntity>> topicQueueMap = queueService.getAllLocatedTopicQueue();
        if (queueMap.containsKey(request.getTopicName()) || topicQueueMap.containsKey(request.getTopicName())) {
            List<QueueEntity> queueEntities = queueMap.get(request.getTopicName());
            if (queueEntities == null || queueEntities.size() == 0) {
                response.setSuc(false);
                response.setMsg("topic_" + request.getTopicName() + "_and_has_no_queue!");
                //如果主题队列map中包含请求中拿到主题名称,或者配置中拿到发布模式,则执行获取队列信息,同时更新队列缓存
                if (topicQueueMap.containsKey(request.getTopicName()) && soaConfig.getPublishMode() == 1) {
                    queueEntities = topicQueueMap.get(request.getTopicName());
                    updateQueueCache(request.getTopicName());
                } else {
                    updateQueueCache(request.getTopicName());
                    return response;
                }
            }
            //如果队列实体列表>0,则执行保存消息操作
            if (queueEntities.size() > 0) {
                saveMsg(request, response, queueEntities);
            }
        } else {
            response.setSuc(false);
            response.setMsg("topic1_" + request.getTopicName() + "_and_has_no_queue!");
            return response;
        }
    } catch (Exception e) {
        log.error("publish_error,and request json is " + JsonUtil.toJsonNull(request), e);
        response.setSuc(false);
        response.setMsg(e.getMessage());
    } finally {
        //最终,将计数进行自减,同时获取
        if (soaConfig.getEnableTopicRate() == 1) {
            totalMax.decrementAndGet();
            //获取主题
            topicPerMax.get(request.getTopicName()).decrementAndGet();
        }
    }
    return response;
}

3.消息存储的过程中需要考虑高可用

代码语言:javascript
复制
使用数据库主备db,消息存储同时进行负载均衡,避免单个数据库出现消息队列请求频繁的情况,造成单个库压力过大,同时库可以水平扩展

进行消息的保存操作

代码语言:javascript
复制
//执行保存消息
protected void doSaveMsg(List<Message01Entity> message01Entities, PublishMessageRequest request,
                         PublishMessageResponse response, QueueEntity temp) {
    // Transaction transaction = Tracer.newTransaction("PubInner-" +
    // temp.getIp(), request.getTopicName());
    //消息服务设置数据库id
    message01Service.setDbId(temp.getDbNodeId());
    Transaction transaction = Tracer.newTransaction("Publish-Data", temp.getIp());
    try {
        transaction.addData("topic", request.getTopicName());
        //执行批量插入
        message01Service.insertBatchDy(request.getTopicName(), temp.getTbName(), message01Entities);
        // 如果订阅该queue的组,开启了实时消息,则给对应的客户端发送异步通知
        if (soaConfig.getMqPushFlag() == 1) {// apollo开关
            //通知客户端
            notifyClient(temp);
        }
        dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis() - soaConfig.getDbFailWaitTime() * 2000L);
        response.setSuc(true);
        transaction.setStatus(Transaction.SUCCESS);
        return;
    } catch (Exception e) {
        // sendPublishFailMail(request, e, 1);
        transaction.setStatus(e);
        if (e instanceof DataIntegrityViolationException
                || e.getCause() instanceof DataIntegrityViolationException) {
            response.setSuc(false);
            response.setMsg(e.getMessage());
            return;
        }
        dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis());
        // transaction.setStatus(e);
        throw new RuntimeException(e);
    } finally {
        transaction.complete();
    }
}

4.消息的消费需要考虑

代码语言:javascript
复制
保证消费一次,采用重试机制进行间断性重试。如果重试后没有成功,采用cat进行链路追踪,同时在业务系统进行幂等处理。

进行消息拉取

代码语言:javascript
复制
//拉取数据,消费消息
@Override
public PullDataResponse pullData(PullDataRequest request) {
    PullDataResponse response = new PullDataResponse();
    response.setSuc(true);
    Map<Long, QueueEntity> data = queueService.getAllQueueMap();
    checkVaild(request, response, data);
    if (!response.isSuc()) {
        return response;
    }
    //获取消息,通过队列id获取
    QueueEntity temp = data.get(request.getQueueId());
    Map<Long, DbNodeEntity> dbNodeMap = dbNodeService.getCache();
    List<Message01Entity> entities = new ArrayList<>();
    Transaction transaction = null;
    if (checkFailTime(request.getTopicName(), temp, null) && checkStatus(temp, dbNodeMap)) {
        //设置数据库id
        message01Service.setDbId(temp.getDbNodeId());
        transaction = Tracer.newTransaction("Pull-Data", temp.getIp());
        try {
            //获取消息,批量
            entities = message01Service.getListDy(temp.getTopicName(), temp.getTbName(), request.getOffsetStart(),
                    request.getOffsetEnd());
            transaction.setStatus(Transaction.SUCCESS);
            dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis() - soaConfig.getDbFailWaitTime() * 2000L);
        } catch (Exception e) {
            transaction.setStatus(e);
            dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis());
            // TODO: handle exception
        }

    } else {
        transaction = Tracer.newTransaction("PullData", "PullData-wait");
        transaction.setStatus(Transaction.SUCCESS);

    }
    transaction.complete();
    List<MessageDto> messageDtos = converMessageDto(entities);
    response.setMsgs(messageDtos);
    return response;
}

5.重平衡:为什么进行重平衡?

代码语言:javascript
复制
进行重平衡的条件:出现消费者consumer的加入和退出、主题topic的增加和退出,通常是进行扩缩容的时候会用到

进行重平衡会做的一个操作:通知重平衡,而过程中会更新重平衡的版本,同时设置通知的消息,同时将通知的消息插入到通知消息服务中,也即会插入到通知消息表中。而在生产者将消息发送存储成功后,也会执行发送通知消费者的动作,也是会做一个通知的操作。例如:

代码语言:javascript
复制
@Override
public void notifyRb(long id) {
   updateRbVersion(Arrays.asList(id));
   List<NotifyMessageEntity> notifyMessageEntities = new ArrayList<>();
   NotifyMessageEntity notifyMessageEntity = new NotifyMessageEntity();
   notifyMessageEntity.setConsumerGroupId(id);
   notifyMessageEntity.setMessageType(MessageType.Rb);
   notifyMessageEntities.add(notifyMessageEntity);

   notifyMessageEntity = new NotifyMessageEntity();
   notifyMessageEntity.setConsumerGroupId(id);
   notifyMessageEntity.setMessageType(MessageType.Meta);
   notifyMessageEntities.add(notifyMessageEntity);
   notifyMessageService.insertBatch(notifyMessageEntities);
}

6.排查问题的保证

代码语言:javascript
复制
审计日志AuditLog和Cat监控=>这个可以从消息发送开始就可以看到直至消息消费

7.优化部分

代码语言:javascript
复制
定时清理冗余数据、定时清理历史数据、定时不再存活的消费者通知邮件告警、定时不再进行消息订阅服务通知邮件、定时截断、定时检查、堆积告警(进行邮件发送),对于失败消息进行存储,同时进行告警、心跳检查(采用时间对比的方式,如果超过心跳上报监测时间,则进行数据移除处理,同时offset设置为0)

进行消息清理

代码语言:javascript
复制
@Override
public void doStart() {
   long minId = notifyMessageService.getMinId();
   if (minId > 0 && isMaster()&&soaConfig.isEnbaleNotifyMessageClean()) {
      log.info("clear_old_data_minId_is_{}_and_maxId_is_{}", minId, minId + 500);
      int count = notifyMessageService.clearOld(soaConfig.getCleanInterval(), minId + 500);
      while (count > 0 && isMaster()) {
         minId = notifyMessageService.getMinId();
         log.info("clear_old_data_minId_is_{}_and_maxId_is_{}", minId, minId + 500);
         count = notifyMessageService.clearOld(soaConfig.getCleanInterval(), minId + 500);
         Util.sleep(3000);
      }
   }
}

8.提交偏移量

偏移量提交在启动broker定时任务的时候,就执行了提交偏移量服务的启动。

代码语言:javascript
复制
/**
     * 提交偏移量
     * @param request
     * @return
     */
    @Override
    public CommitOffsetResponse commitOffset(CommitOffsetRequest request) {
        // Transaction catTransaction = Tracer.newTransaction("Timer-service",
        // "commitOffset");
        CommitOffsetResponse response = new CommitOffsetResponse();
        response.setSuc(true);
        Map<Long, ConsumerQueueVersionDto> map = mapAppPolling.get();
        try {
            //如果请求不为空,同时请求的队列偏移量不为空,则执行提交操作
            if (request != null && !CollectionUtils.isEmpty(request.getQueueOffsets())) {
                request.getQueueOffsets().forEach(t1 -> {
                    ConsumerQueueVersionDto temp = map.get(t1.getQueueOffsetId());
                    boolean flag1 = true;
                    if (temp == null) {
                        synchronized (lockObj1) {
                            temp = map.get(t1.getQueueOffsetId());
                            if (temp == null) {
                                map.put(t1.getQueueOffsetId(), t1);
                                flag1 = false;
                            }
                        }
                    }
                    //如果flag1为true,偏移量版本小,则清理掉老数据,同时将新数据放入,如果当前偏移量版本相等,但偏移量小,则清掉老数据。

                    if (flag1) {
                        if (temp.getOffsetVersion() < t1.getOffsetVersion()) {
                            clearOldData();
                            map.put(t1.getQueueOffsetId(), t1);
                        } else if (temp.getOffsetVersion() == t1.getOffsetVersion()
                                && temp.getOffset() < t1.getOffset()) {
                            clearOldData();
                            map.put(t1.getQueueOffsetId(), t1);
                        }
                    }
                });
                //如果是请求1,则获取版本执行提交偏移量操作
                if (request.getFlag() == 1) {
                    Map<Long, OffsetVersionEntity> offsetVersionMap = queueOffsetService.getOffsetVersion();
                    request.getQueueOffsets().forEach(t1 -> {
                        //执行提交偏移量,此时会进行查询,获取标识flag,查询提交偏移量如果>0,则执行后续操作,此时必须满足有偏移的消息和偏移信息
                        doCommitOffset(t1, 1, offsetVersionMap, 0);
                    });
                }
            }
        } catch (Exception e) {
        }
        // catTransaction.setStatus(Transaction.SUCCESS);
        // catTransaction.complete();
        return response;
    }

9.对于消息进行重平衡的过程中

代码语言:javascript
复制
比如会涉及到偏移量的问题,考虑解决偏移量offset的处理

执行重平衡中执行偏移量处理

代码语言:javascript
复制
//执行rb重平衡操作
    @Override
    // @Transactional(rollbackFor = Exception.class)
    public void rb(List<QueueOffsetEntity> queueOffsetEntities) {
        Map<Long, String> idsMap = new HashMap<>(30);
        List<NotifyMessageEntity> notifyMessageEntities = new ArrayList<>(30);
        //将传入的查询偏移量的消费组id放入,并执行更新操作
        queueOffsetEntities.forEach(t1 -> {
            idsMap.put(t1.getConsumerGroupId(), "");
            NotifyMessageEntity notifyMessageEntity = new NotifyMessageEntity();
            notifyMessageEntity.setConsumerGroupId(t1.getConsumerGroupId());
            notifyMessageEntity.setMessageType(MessageType.Meta);
            notifyMessageEntities.add(notifyMessageEntity);
            // 更新consumerid 和consumername
            queueOffsetService.updateConsumerId(t1);
        });
        // 更新重平衡版本,注意这个代码非常的重要,这个可以保证客户端能够拿到最新的重平衡版本号
        updateRbVersion(new ArrayList<>(idsMap.keySet()));
        // 批量插入消息事件
        notifyMessageService.insertBatch(notifyMessageEntities);

    }

总结

一个消息中间件少不了的是消息的发送、存储、消费,而在pmq发送过程中需要考虑消息的发送失败情况,此时需要使用重试+链路,如果还是不行,将失败消息放入数据库中,此时将发送失败的消息存入到数据中,以便重新发送。消息发送成功,进行消息存储。而存储的过程中,需要考虑数据库的挂了的情况,因此需要采用数据库主备。而采用通知消费者消息的方式,通知消费者可以拉取消息进行消费了。此时消费者进行消息的消费,此时需要考虑消息偏移量,而这个偏移量则是消息id,与rocketmq中不同,同时在消费的过程中,不排除重平衡的情况产生,也即消费者的上下线、队列上下线的情况,因此此时进行重平衡。而消费的过程中,也会产生消息丢失的情况,因此需要采用重试+链路监控的方式。同时对于消费者,需要进行消息至少一次,因此如果需要一次,在业务消费方中需要进行幂等处理。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-03-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • pmq架构图
  • pmq中消息端到端流程
  • 1.消息的核心是什么?
  • 2.消息发送的过程中需要考虑什么?
    • 进行发送消息处理
      • 发送请求的过程中,如果失败还会进行降速处理
        • 保存消息到数据库
        • 3.消息存储的过程中需要考虑高可用
          • 进行消息的保存操作
          • 4.消息的消费需要考虑
            • 进行消息拉取
            • 5.重平衡:为什么进行重平衡?
            • 6.排查问题的保证
            • 7.优化部分
              • 进行消息清理
              • 8.提交偏移量
              • 9.对于消息进行重平衡的过程中
                • 执行重平衡中执行偏移量处理
                • 总结
                相关产品与服务
                对象存储
                对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档