前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >为什么放弃Kafka,选择Pulsar?

为什么放弃Kafka,选择Pulsar?

作者头像
微观技术
发布2021-04-30 11:45:10
1.1K0
发布2021-04-30 11:45:10
举报
文章被收录于专栏:微观技术

Spring Boot 作为主流微服务框架,拥有成熟的社区生态。市场应用广泛,为了方便大家,整理了一个基于spring boot的常用中间件快速集成入门系列手册,涉及RPC、缓存、消息队列、分库分表、注册中心、分布式配置等常用开源组件,大概有几十篇文章,陆续会开放出来,感兴趣同学请提前关注&收藏

Pulsar 介绍

Pulsar 是 Yahoo 在 2013 年创建的,2016年贡献给了 Apache 基金会,目前已经是 Apache 的顶级项目。Yahoo、Verizon、Twitter 等很多公司都在使用 Pulsar 来处理海量消息。

Pulsar 声称比 Kafka 更快、运行成本更低、解决了很多 Kafka 的痛点。

Pulsar 非常灵活,可以像Kafka 一样作为分布式日志系统,也可以作为类似RabbitMQ 这类简单的消息系统。

Pulsar 有多种订阅类型、传递保障、保存策略。

特性

  • 内置多租户

不同的团队可以使用同一个集群,互相隔离。支持隔离、认证授权、配额。

  • 多层架构

Pulsar 使用特定的数据层来存储 topic 数据,使用了 Apache BookKeeper 作为数据账本。Broker 与存储分离。

使用分隔机制可以解决集群的扩展、再平衡、维护等问题。也提升了可用性,不会丢失数据。

因为使用了多层架构,对于 topic 数量没有限制,topic 与存储是分离的,也可以创建非持久化的 topic。

  • 多层存储

Kafka 中存储是很昂贵的,所以很少存储冷数据。Pulsar 使用了多层存储,可以自动把旧数据移动到专门的存储设备,例如 Amazon S3,但是对于客户端来讲是透明的,还可以正常使用。

  • Functions

Pulsar Function 是一种部署简单,轻量级计算、对开发人员友好的 API,无需像 Kafka 那样运行自己的流处理引擎。

  • 安全

内置了代理、多租户安全机制、可插入的身份验证等功能。

  • 快速再平衡

partition 被分为了小块儿,所以再平衡时非常快。

  • 多系统集成

例如 Kafka、RabbitMQ 等系统都可以轻松集成。

  • 支持多种开发语言

例如 Go、Java、Scala、Node、Python 等等

为什么选择 Pulsar

目前业界使用比较多的是 Kafka,主要场景是大数据日志处理,较少用于金融场景。RocketMQ 对 Topic 运营不太友好,特别是不支持按 Topic 删除失效消息,以及不具备宕机 Failover 能力。选 Pulsar 是因为其原生的高一致性,基于 BookKeeper 提供高可用存储服务,采用了存储和服务分离架构方便扩容,同时还支持多种消费模式和多域部署模式。Kafka、RocketMQ 和 Pulsar 的对比如下:

Show me the code

外部依赖:

在 pom.xml 中添加 Pulsar 依赖:

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>2.4.0</version>
</dependency>

配置文件:

在配置文件 application.yaml中配置 Pulsar 的相关参数,具体内容如下:

代码语言:javascript
复制
pulsar:
  service:
    url: pulsar://127.0.0.1:6650

Producer 发送消息:

生产端提供了一个restful接口,模拟发送一条创建新用户消息。

代码语言:javascript
复制
Long id = Long.valueOf(new Random().nextInt(1000));
User user = User.builder().id(id).userName("TomGE").age(29).address("上海").build();
userPulsarMsgProducer.send(user);

内部通过 @PostConstruct 在应用启动时,初始化org.apache.pulsar.client.api.Producer实例,并交由spring 容器统一管理。

代码语言:javascript
复制
public void send(T msg) {
    String msgBody = JSON.toJSONString(msg);
    try {
        MessageId messageId = producer.send(msgBody.getBytes(StandardCharsets.UTF_8));
        log.info("pulsar msg send success, topic:{}, messageId:{}, msg:{}", getTopic(), messageId, msgBody);
    } catch (Throwable e) {
        log.error("pulsar msg send failed, topic:{}, msg:{}", getTopic(), msgBody);
    }
}

Producer 发送延迟消息:

适用于一些有延迟处理要求的业务场景,比如电商交易的自动确认收货,在卖家发出货品后,有15天的观察期,这期间如果买家没有发起逆向流程/申请退款,将会由系统自动触发超时确认收货

不同业务场景,设定不同的延迟时间值,可以让消费端在延迟指定时间后才能拉取到消息并进行消费。借助于该框架特性,有效节省开发成本和难度。

代码语言:javascript
复制
producer.newMessage().deliverAfter(delay, unit)
        .value(msgBody.getBytes(StandardCharsets.UTF_8))
        .send();

Consumer 消费消息:

系统启动时,自动创建consumer消费实例,并埋入org.apache.pulsar.client.api.MessageListener接口实现,用于具体的消息消费处理逻辑。

代码语言:javascript
复制
@PostConstruct
void init() throws PulsarClientException {
    consumer = client.createConsumer(getTopic(), getSubscriptionName(), new DefaultJsonMsgListener());
}

 class DefaultJsonMsgListener implements MessageListener<byte[]> {

        @Override
        public void received(Consumer<byte[]> consumer, Message<byte[]> message) {
            if (null != message && null != message.getData() && message.getData().length != 0) {
                String msgBody = new String(message.getValue(), StandardCharsets.UTF_8);

                log.warn("topic:{} receive message:{}", getTopic(), msgBody);
                try {
                    T msg = JSON.parseObject(msgBody, clazzT);
                    handleMsg(msg);
                } catch (Exception e) {
                    log.error("handle msg failed, topic:{}, message:{}", getTopic(), msgBody, e);
                    return;
                }
            }

            try {
                // 提交消费位移
                consumer.acknowledge(message);
            } catch (PulsarClientException e) {
                log.error("topic:{} ack failed", getTopic(), e);
            }
        }
    }

演示代码地址

代码语言:javascript
复制
https://github.com/aalansehaiyang/spring-boot-bulking  

模块:spring-boot-bulking-pulsar

你好,我是微观技术

计算机专业出身,研究僧,校招进入阿里,架构师

写过专利,竞赛拿过奖,CSDN博客专家

负责过电商交易、社区、营销、金融等业务

多年团队管理经验,爱思考

热衷把工作沉淀写成文章

感兴趣的同学请关注,也欢迎加微信好友交流

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-04-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 微观技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Pulsar 介绍
  • 特性
  • 为什么选择 Pulsar
  • Show me the code
  • 演示代码地址
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档