首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >pmq学习总结

pmq学习总结

作者头像
路行的亚洲
发布于 2021-03-04 07:47:50
发布于 2021-03-04 07:47:50
65100
代码可运行
举报
文章被收录于专栏:后端技术学习后端技术学习
运行总次数:0
代码可运行

pmq架构图

pmq中消息端到端流程

1.消息的核心是什么?

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1) 消息发送publish =>MqClient.publish(topicName, "",new ProducerDataDto(String.valueOf(i)));=>ConsumerController#publish
2)消息存储 saveMessage =>msgNotifyService.notify(request);=>MqQueueExcutorService#notifyMsg
3)消息消费 doPullingData=>pullData =>ConsumerController#consumerService.pullData(request)    

2.消息发送的过程中需要考虑什么?

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
1)消息发送中丢失的情况,而此时必然需要保证至少交付1次(at least one)2)消息发送采用重试机制(retry),如果重试没有效果,会将失败的队列的相关信息记录到链路和日志中(cat and log),保证交付。
3)同时对重平衡的处理(考虑消费者上下线和队列上下线的情况,也即扩缩容的情况),同时消费的过程中,会对偏移量进行统计,同时会进行心跳上报,证明自己还活着没有下线。
4)而这个偏移量则是消息id。与rocketMQ中不同。
5)消费者在业务方需要做幂等处理,因为消息重启后可能会接收到少量重复的消息。
6)而高可用通过数据库主备来保证。
7)而如果出现生产者、消费者、broker挂掉的情况,在Metric中有详细的情况记录。
8)分区通过表隔离,一个分区对应一个表,不同主题/分区互不干扰。上线下线时,通过Broker向DB插入一条消息,表示需要触发重平衡。一旦broker有重平衡产生,立即通知消费者进行重平衡。消费端收到重平衡后,立即提交偏移并停止消费,连续三次(可配)被分配队列相同时,则开启新的消费。  

进行发送消息处理

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//进行判空处理,处理完成返回true
if (request == null) {
    return true;
}
//进行事务处理、同时进行metric信息收集,执行请求处理,处理完,进行耗时统计,设置事务状态,添加信息到链路中,如果失败,执行失败处理,将失败信息放入链路中
Transaction transaction = Tracer.newTransaction("mq-client-publish", request.getTopicName());
Timer.Context timer1 = MetricSingleton.getMetricRegistry()
    .timer("mq.client.publish.time?topic=" + request.getTopicName()).time();
try {
    String url = MqConstanst.CONSUMERPRE + "/publish";
    long start = System.nanoTime();
    PublishMessageResponse response = post(request, url, retryTimes, PublishMessageResponse.class, true);
    long end = System.nanoTime();
    if (response.getTime() > 0) {
        long t = end - start - response.getTime();
        t = (t - t % 1000000) / 1000000;
        MetricSingleton.getMetricRegistry()
            .histogram("mq.client.publish.network.time?topic=" + request.getTopicName()).update(t);
    }
    transaction.setStatus(Transaction.SUCCESS);
    if (!response.isSuc()) {
        String json = JsonUtil.toJson(request);
        logger.error(response.getMsg());
        CatRequest request2 = new CatRequest();
        request2.setMethod("publish_fail");
        request2.setJson(json);
        request2.setMsg(response.getMsg());
        addCat(request2);
    }
    return response.isSuc();
} catch (Exception e) {
    MetricSingleton.getMetricRegistry().counter("mq.client.publish.fail.count?topic=" + request.getTopicName())
        .inc();
    logger.error("publish_error", e);
    String json = JsonUtil.toJson(request);
    transaction.setStatus(e);
    CatRequest request2 = new CatRequest();
    request2.setMethod("publish");
    request2.setJson(json);
    request2.setMsg(e.getMessage());
    addCat(request2);

    SendMailRequest mailRequest = new SendMailRequest();
    mailRequest.setSubject("消息发送失败,客户端:" + request.getClientIp() + ",Topic:" + request.getTopicName());
    mailRequest.setContent("消息发送异常," + ",消息体是:" + json + ",异常原因是:" + e.getMessage());
    mailRequest.setType(2);
    mailRequest.setTopicName(request.getTopicName());
    sendMail(mailRequest);
    return false;
} finally {
    transaction.complete();
    timer1.stop();
}

发送请求的过程中,如果失败还会进行降速处理

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
if (response != null) {
   if (isImportant) {
      failUrlG1.put(host, System.currentTimeMillis() - 10 * 1000);
   } else {
      failUrlG2.put(host, System.currentTimeMillis() - 10 * 1000);
   }
   if (response instanceof PublishMessageResponse) {
      PublishMessageResponse response2 = ((PublishMessageResponse) response);
      if (response2.getSleepTime() > 0) {
         response = null;
         logger.info(response2.getMsg());
         Util.sleep(response2.getSleepTime());
         // 这个不算重试,只是降速
         count--;
      }
   } else {
      BaseResponse baseResponse = (BaseResponse) response;
      if (!baseResponse.isSuc() && baseResponse.getCode() == MqConstanst.NO) {
         response = null;
         Util.sleep(1000);
      } else {
         if (!baseResponse.isSuc()) {
            logger.error(baseResponse.getMsg());
         }
      }
   }
} else {
   // response 等于null 说明接口调用失败了。此时需要将url 放入失败接口中。
   if (isImportant) {
      failUrlG1.put(host, System.currentTimeMillis());
   } else {
      failUrlG2.put(host, System.currentTimeMillis());
   }
   Util.sleep(500);
}

保存消息到数据库

存储的过程就是将消息进行保存的过程,在pmq中就是将数据保存到数据库

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//publish操作
@Override
public PublishMessageResponse publish(PublishMessageRequest request) {
    PublishMessageResponse response = new PublishMessageResponse();
    checkVaild(request, response);
    if (!response.isSuc()) {
        return response;
    }
    try {
        if (!checkTopicRate(request, response)) {
            return response;
        }
        //获取所有分配好的topic写队列
        Map<String, List<QueueEntity>> queueMap = queueService.getAllLocatedTopicWriteQueue();
        //获取所以分配好的主题队列
        Map<String, List<QueueEntity>> topicQueueMap = queueService.getAllLocatedTopicQueue();
        if (queueMap.containsKey(request.getTopicName()) || topicQueueMap.containsKey(request.getTopicName())) {
            List<QueueEntity> queueEntities = queueMap.get(request.getTopicName());
            if (queueEntities == null || queueEntities.size() == 0) {
                response.setSuc(false);
                response.setMsg("topic_" + request.getTopicName() + "_and_has_no_queue!");
                //如果主题队列map中包含请求中拿到主题名称,或者配置中拿到发布模式,则执行获取队列信息,同时更新队列缓存
                if (topicQueueMap.containsKey(request.getTopicName()) && soaConfig.getPublishMode() == 1) {
                    queueEntities = topicQueueMap.get(request.getTopicName());
                    updateQueueCache(request.getTopicName());
                } else {
                    updateQueueCache(request.getTopicName());
                    return response;
                }
            }
            //如果队列实体列表>0,则执行保存消息操作
            if (queueEntities.size() > 0) {
                saveMsg(request, response, queueEntities);
            }
        } else {
            response.setSuc(false);
            response.setMsg("topic1_" + request.getTopicName() + "_and_has_no_queue!");
            return response;
        }
    } catch (Exception e) {
        log.error("publish_error,and request json is " + JsonUtil.toJsonNull(request), e);
        response.setSuc(false);
        response.setMsg(e.getMessage());
    } finally {
        //最终,将计数进行自减,同时获取
        if (soaConfig.getEnableTopicRate() == 1) {
            totalMax.decrementAndGet();
            //获取主题
            topicPerMax.get(request.getTopicName()).decrementAndGet();
        }
    }
    return response;
}

3.消息存储的过程中需要考虑高可用

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
使用数据库主备db,消息存储同时进行负载均衡,避免单个数据库出现消息队列请求频繁的情况,造成单个库压力过大,同时库可以水平扩展

进行消息的保存操作

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//执行保存消息
protected void doSaveMsg(List<Message01Entity> message01Entities, PublishMessageRequest request,
                         PublishMessageResponse response, QueueEntity temp) {
    // Transaction transaction = Tracer.newTransaction("PubInner-" +
    // temp.getIp(), request.getTopicName());
    //消息服务设置数据库id
    message01Service.setDbId(temp.getDbNodeId());
    Transaction transaction = Tracer.newTransaction("Publish-Data", temp.getIp());
    try {
        transaction.addData("topic", request.getTopicName());
        //执行批量插入
        message01Service.insertBatchDy(request.getTopicName(), temp.getTbName(), message01Entities);
        // 如果订阅该queue的组,开启了实时消息,则给对应的客户端发送异步通知
        if (soaConfig.getMqPushFlag() == 1) {// apollo开关
            //通知客户端
            notifyClient(temp);
        }
        dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis() - soaConfig.getDbFailWaitTime() * 2000L);
        response.setSuc(true);
        transaction.setStatus(Transaction.SUCCESS);
        return;
    } catch (Exception e) {
        // sendPublishFailMail(request, e, 1);
        transaction.setStatus(e);
        if (e instanceof DataIntegrityViolationException
                || e.getCause() instanceof DataIntegrityViolationException) {
            response.setSuc(false);
            response.setMsg(e.getMessage());
            return;
        }
        dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis());
        // transaction.setStatus(e);
        throw new RuntimeException(e);
    } finally {
        transaction.complete();
    }
}

4.消息的消费需要考虑

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
保证消费一次,采用重试机制进行间断性重试。如果重试后没有成功,采用cat进行链路追踪,同时在业务系统进行幂等处理。

进行消息拉取

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//拉取数据,消费消息
@Override
public PullDataResponse pullData(PullDataRequest request) {
    PullDataResponse response = new PullDataResponse();
    response.setSuc(true);
    Map<Long, QueueEntity> data = queueService.getAllQueueMap();
    checkVaild(request, response, data);
    if (!response.isSuc()) {
        return response;
    }
    //获取消息,通过队列id获取
    QueueEntity temp = data.get(request.getQueueId());
    Map<Long, DbNodeEntity> dbNodeMap = dbNodeService.getCache();
    List<Message01Entity> entities = new ArrayList<>();
    Transaction transaction = null;
    if (checkFailTime(request.getTopicName(), temp, null) && checkStatus(temp, dbNodeMap)) {
        //设置数据库id
        message01Service.setDbId(temp.getDbNodeId());
        transaction = Tracer.newTransaction("Pull-Data", temp.getIp());
        try {
            //获取消息,批量
            entities = message01Service.getListDy(temp.getTopicName(), temp.getTbName(), request.getOffsetStart(),
                    request.getOffsetEnd());
            transaction.setStatus(Transaction.SUCCESS);
            dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis() - soaConfig.getDbFailWaitTime() * 2000L);
        } catch (Exception e) {
            transaction.setStatus(e);
            dbFailMap.put(getFailDbUp(temp), System.currentTimeMillis());
            // TODO: handle exception
        }

    } else {
        transaction = Tracer.newTransaction("PullData", "PullData-wait");
        transaction.setStatus(Transaction.SUCCESS);

    }
    transaction.complete();
    List<MessageDto> messageDtos = converMessageDto(entities);
    response.setMsgs(messageDtos);
    return response;
}

5.重平衡:为什么进行重平衡?

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
进行重平衡的条件:出现消费者consumer的加入和退出、主题topic的增加和退出,通常是进行扩缩容的时候会用到

进行重平衡会做的一个操作:通知重平衡,而过程中会更新重平衡的版本,同时设置通知的消息,同时将通知的消息插入到通知消息服务中,也即会插入到通知消息表中。而在生产者将消息发送存储成功后,也会执行发送通知消费者的动作,也是会做一个通知的操作。例如:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
public void notifyRb(long id) {
   updateRbVersion(Arrays.asList(id));
   List<NotifyMessageEntity> notifyMessageEntities = new ArrayList<>();
   NotifyMessageEntity notifyMessageEntity = new NotifyMessageEntity();
   notifyMessageEntity.setConsumerGroupId(id);
   notifyMessageEntity.setMessageType(MessageType.Rb);
   notifyMessageEntities.add(notifyMessageEntity);

   notifyMessageEntity = new NotifyMessageEntity();
   notifyMessageEntity.setConsumerGroupId(id);
   notifyMessageEntity.setMessageType(MessageType.Meta);
   notifyMessageEntities.add(notifyMessageEntity);
   notifyMessageService.insertBatch(notifyMessageEntities);
}

6.排查问题的保证

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
审计日志AuditLog和Cat监控=>这个可以从消息发送开始就可以看到直至消息消费

7.优化部分

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
定时清理冗余数据、定时清理历史数据、定时不再存活的消费者通知邮件告警、定时不再进行消息订阅服务通知邮件、定时截断、定时检查、堆积告警(进行邮件发送),对于失败消息进行存储,同时进行告警、心跳检查(采用时间对比的方式,如果超过心跳上报监测时间,则进行数据移除处理,同时offset设置为0)

进行消息清理

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
public void doStart() {
   long minId = notifyMessageService.getMinId();
   if (minId > 0 && isMaster()&&soaConfig.isEnbaleNotifyMessageClean()) {
      log.info("clear_old_data_minId_is_{}_and_maxId_is_{}", minId, minId + 500);
      int count = notifyMessageService.clearOld(soaConfig.getCleanInterval(), minId + 500);
      while (count > 0 && isMaster()) {
         minId = notifyMessageService.getMinId();
         log.info("clear_old_data_minId_is_{}_and_maxId_is_{}", minId, minId + 500);
         count = notifyMessageService.clearOld(soaConfig.getCleanInterval(), minId + 500);
         Util.sleep(3000);
      }
   }
}

8.提交偏移量

偏移量提交在启动broker定时任务的时候,就执行了提交偏移量服务的启动。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
     * 提交偏移量
     * @param request
     * @return
     */
    @Override
    public CommitOffsetResponse commitOffset(CommitOffsetRequest request) {
        // Transaction catTransaction = Tracer.newTransaction("Timer-service",
        // "commitOffset");
        CommitOffsetResponse response = new CommitOffsetResponse();
        response.setSuc(true);
        Map<Long, ConsumerQueueVersionDto> map = mapAppPolling.get();
        try {
            //如果请求不为空,同时请求的队列偏移量不为空,则执行提交操作
            if (request != null && !CollectionUtils.isEmpty(request.getQueueOffsets())) {
                request.getQueueOffsets().forEach(t1 -> {
                    ConsumerQueueVersionDto temp = map.get(t1.getQueueOffsetId());
                    boolean flag1 = true;
                    if (temp == null) {
                        synchronized (lockObj1) {
                            temp = map.get(t1.getQueueOffsetId());
                            if (temp == null) {
                                map.put(t1.getQueueOffsetId(), t1);
                                flag1 = false;
                            }
                        }
                    }
                    //如果flag1为true,偏移量版本小,则清理掉老数据,同时将新数据放入,如果当前偏移量版本相等,但偏移量小,则清掉老数据。

                    if (flag1) {
                        if (temp.getOffsetVersion() < t1.getOffsetVersion()) {
                            clearOldData();
                            map.put(t1.getQueueOffsetId(), t1);
                        } else if (temp.getOffsetVersion() == t1.getOffsetVersion()
                                && temp.getOffset() < t1.getOffset()) {
                            clearOldData();
                            map.put(t1.getQueueOffsetId(), t1);
                        }
                    }
                });
                //如果是请求1,则获取版本执行提交偏移量操作
                if (request.getFlag() == 1) {
                    Map<Long, OffsetVersionEntity> offsetVersionMap = queueOffsetService.getOffsetVersion();
                    request.getQueueOffsets().forEach(t1 -> {
                        //执行提交偏移量,此时会进行查询,获取标识flag,查询提交偏移量如果>0,则执行后续操作,此时必须满足有偏移的消息和偏移信息
                        doCommitOffset(t1, 1, offsetVersionMap, 0);
                    });
                }
            }
        } catch (Exception e) {
        }
        // catTransaction.setStatus(Transaction.SUCCESS);
        // catTransaction.complete();
        return response;
    }

9.对于消息进行重平衡的过程中

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
比如会涉及到偏移量的问题,考虑解决偏移量offset的处理

执行重平衡中执行偏移量处理

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//执行rb重平衡操作
    @Override
    // @Transactional(rollbackFor = Exception.class)
    public void rb(List<QueueOffsetEntity> queueOffsetEntities) {
        Map<Long, String> idsMap = new HashMap<>(30);
        List<NotifyMessageEntity> notifyMessageEntities = new ArrayList<>(30);
        //将传入的查询偏移量的消费组id放入,并执行更新操作
        queueOffsetEntities.forEach(t1 -> {
            idsMap.put(t1.getConsumerGroupId(), "");
            NotifyMessageEntity notifyMessageEntity = new NotifyMessageEntity();
            notifyMessageEntity.setConsumerGroupId(t1.getConsumerGroupId());
            notifyMessageEntity.setMessageType(MessageType.Meta);
            notifyMessageEntities.add(notifyMessageEntity);
            // 更新consumerid 和consumername
            queueOffsetService.updateConsumerId(t1);
        });
        // 更新重平衡版本,注意这个代码非常的重要,这个可以保证客户端能够拿到最新的重平衡版本号
        updateRbVersion(new ArrayList<>(idsMap.keySet()));
        // 批量插入消息事件
        notifyMessageService.insertBatch(notifyMessageEntities);

    }

总结

一个消息中间件少不了的是消息的发送、存储、消费,而在pmq发送过程中需要考虑消息的发送失败情况,此时需要使用重试+链路,如果还是不行,将失败消息放入数据库中,此时将发送失败的消息存入到数据中,以便重新发送。消息发送成功,进行消息存储。而存储的过程中,需要考虑数据库的挂了的情况,因此需要采用数据库主备。而采用通知消费者消息的方式,通知消费者可以拉取消息进行消费了。此时消费者进行消息的消费,此时需要考虑消息偏移量,而这个偏移量则是消息id,与rocketmq中不同,同时在消费的过程中,不排除重平衡的情况产生,也即消费者的上下线、队列上下线的情况,因此此时进行重平衡。而消费的过程中,也会产生消息丢失的情况,因此需要采用重试+链路监控的方式。同时对于消费者,需要进行消息至少一次,因此如果需要一次,在业务消费方中需要进行幂等处理。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-03-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
pmq学习四-生产消息到存储到消费的过程
需要承认的是前面学习二中,pmq中的发送消息和消费消息是两个动作,同时操作的过程publish和pullData两个操作。认知有限,我误导大家了。接上上面的话题,发送消息publish的操作,前面已经说到基于HttpClient会执行post请求,里面有一个重要的url,这个是我们需要关注的。每一个url请求都是操作的开始。
路行的亚洲
2021/01/18
6020
pmq学习四-生产消息到存储到消费的过程
pmq学习六-broker启动
如果提交线程数大小不等于在soa配置中获取的提交线程大小,则修改成配置中的提交线程数大小,
路行的亚洲
2021/02/03
6550
pmq学习六-broker启动
pmq学习二-生产者和消费者流程
学习一个框架,通常从example开始。同时一个消息中间件是从生产者开始,消费者消费消息。这里mq-client-test-001里面的两个类开始。
路行的亚洲
2021/01/05
6960
pmq学习二-生产者和消费者流程
pmq学习三-mq客户端启动的流程
我们知道在RocketMQ中,服务端代表的是broker,而客户端才是我们的生产者和消费者。而pmq中,也是如此,服务端是broker,而客户端是生产者和消费者。客户端与spring集成,是从这里开始的,可以看到mq启动处理器实现了BeanFactoryPostProcessor,重写了postProcessBeanFactory后置处理器bean工厂。这里基本上涉及到IMqFactory上的接口。
路行的亚洲
2021/01/05
9260
pmq学习三-mq客户端启动的流程
CAT链路追踪从入门到实战(看这一篇就够了)
整体设计 简单即是最好原则设计, 主要分为三个模块cat-client,cat-consumer,cat-home。
小熊学Java
2023/07/16
2K0
CAT链路追踪从入门到实战(看这一篇就够了)
学习 Kafka 入门知识看这一篇就够了!(万字长文)
Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。
纯洁的微笑
2019/12/03
58.3K3
都说Kafka牛3万字带你全面掌握kafka
系统间的耦合高怎么办,我们如何不让一个服务过于庞大,一个好的方式就是依据具体的功能模块拆分服务,降低服务的耦合度,服务间的交互可以通过消息传递数据来实现,除此之外Kafka非常适合在线日志收集等高吞吐场景,kafka有更好的吞吐量,内置分区,副本和故障转移,这有利于处理大规模的消息,所以kafka被各大公司广泛运用于消息队列的构建:
大数据老哥
2021/03/05
1.5K0
都说Kafka牛3万字带你全面掌握kafka
pmq再学习二
前面我们知道pmq的生产者和消费者需要进行生产和消费,需要有基础数据的支撑,也即元数据的支撑。这个过程首先需要有主题,然后创建消费组,在消费组中,我们根据我们的需要进行消息的订阅,订阅中也即绑定了消费组和主题的关系。
路行的亚洲
2021/07/23
6920
pmq再学习二
看完这篇Kafka,你也许就会了Kafka[通俗易懂]
Kafka是一种消息队列,主要用来处理大量数据状态下的消息队列,一般用来做日志的处理。既然是消息队列,那么Kafka也就拥有消息队列的相应的特性了。
全栈程序员站长
2022/08/24
18K4
看完这篇Kafka,你也许就会了Kafka[通俗易懂]
Kafka 基础面试题
答:Apache Kafka是一个发布 - 订阅开源消息代理应用程序。这个消息传递应用程序是用“scala”编码的。基本上,这个项目是由Apache软件启动的。Kafka的设计模式主要基于事务日志设计。
Tim在路上
2020/08/05
7640
pmq学习七-重平衡
可以看到在pmq-ui中启动时会执行启动ComsumerGroupRb服务,同时还会启动MessageLagNotify服务以及Queue队列服务。消费组重平衡服务、db节点服务、消息清理服务、消息告警通知服务、清理无用服务、队列服务等
路行的亚洲
2021/02/03
4730
pmq学习七-重平衡
apollo客户端通知原理
Apollo是携程开源的一个分布式配置中心,提供了丰富的能力,其中就包括配置发布动态通知。
叔牙
2022/03/28
3.4K0
apollo客户端通知原理
SpringBoot集成CAT调用链实例
一年一度的双十一购物狂欢节就要到了,又到剁手党们开始表演的时刻了。当我们把种草很久的商品放入购物车以后,点击“结算”按钮时,就来到了买买买必不可少的结算页面了。让我们虚拟一个买买买结算系统,为结算页面提供商品、促销、库存等结算信息,就此系统展开如何在SpringBoot项目中集成CAT调用链。买买买结算系统包含以下4个项目:
万猫学社
2022/04/22
7220
SpringBoot集成CAT调用链实例
Apollo 源码解析 —— Admin Service 发送 ReleaseMessage
摘要: 原创出处 http://www.iocoder.cn/Apollo/admin-server-send-release-message/ 「芋道源码」欢迎转载,保留摘要,谢谢!
芋道源码
2020/06/01
8820
熬夜之作:一文带你了解Cat分布式监控
CAT(Central Application Tracking)是基于 Java 开发的实时应用监控平台,包括实时应用监控,业务监控。
猿天地
2020/06/10
1.8K0
熬夜之作:一文带你了解Cat分布式监控
pmq再学习一
首先基础数据的来源和怎么产生联系的?创建主题,有了主题,创建消费组,然后基于消费组这个大前提,执行订阅操作,订阅需要进行消费的主题信息,然后在订阅的基础上,进行队列的分配,而分配过程中,首先会去找到可分配的数据节点,然后根据条件进行匹配,然后进行返回。在这个过程中会执行元数据的变更和重平衡操作。特别的,这里需要重点关注队列是怎样产生和分配的。
路行的亚洲
2021/06/24
7370
调用链监控 CAT 之 入门
CAT 是一个实时和接近全量的监控系统,它侧重于对Java应用的监控,基本接入了美团上海所有核心应用。目前在中间件(MVC、RPC、数据库、缓存等)框架中得到广泛应用,为美团各业务线提供系统的性能指标、健康状况、监控告警等。
程序员果果
2019/05/23
1.3K0
Apollo 源码解析 —— Client 轮询配置
本文分享 Client 如何通过轮询的方式,从 Config Service 读取配置。Client 的轮询包括两部分:
芋道源码
2020/05/25
2.7K0
Apollo 源码解析 —— Client 轮询配置
【Kafka】Kafka 基础知识总结
Kafka其实是一款基于发布与订阅模式的消息系统,如果按常理来设计,大家是不是把消息发送者的消息直接发送给消息消费者?但Kafka并不是这么设计的,Kafka消息的生产者会对消息进行分类,再发送给中间的消息服务系统,而消息消费者通过订阅某分类的消息去接受特定类型的消息。
JavaSouth南哥
2024/12/04
2450
【Kafka】Kafka 基础知识总结
一段解决kafka消息处理异常的经典对话
有一天,卡尔维护的购买系统发生了一个奇怪的异常,从日志里看到,购买后的任务处理竟然先于购买任务执行了。“不可能啊,按照代码的顺序,一定是先执行购买流程,再发送消息到kafka,最后消费端接收到消息后执行购买后的一些善后任务。从A到B到C,顺序清清楚楚。” 于是,他请教了马克,马克眯着眼睛细看了一会,道:"问题是不是出在这段@Transaction注解上?"
java达人
2018/07/31
1.4K0
一段解决kafka消息处理异常的经典对话
相关推荐
pmq学习四-生产消息到存储到消费的过程
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档