前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka事务:入门篇

kafka事务:入门篇

原创
作者头像
皮皮熊
修改2022-09-07 14:32:20
1.1K2
修改2022-09-07 14:32:20
举报
文章被收录于专栏:大数据与实时计算

之前搜索网上kafka事务相关文章,要么不清不楚,要么过于深奥。最近难得一闲,啃了一下kip-98,终于如愿能系统地总结一下kafka事务的原理与实现。

kafka事务

kafka官方设计文档 kip-98。大家可以先阅读我这篇文章,再去啃kip原文。

Kafka 的事务可以看作Kafka 中最难的知识点之一!

一、基础概念

一般而言,消息中间件的消息传输保障有3个层级:

  • at most once:至多一次。消息可能会丢失,但绝对不会重复传输
  • at least once:最少一次。消息绝不会丢失,但可能会重复传输
  • exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次

Kafka 的消息传输保障机制非常直观。当生产者向 Kafka 发送消息时,一旦消息被成功提交到日志文件,由于多副本机制的存在,这条消息就不会丢失。

Kafka从0.11.0.0版本开始引入了幂等和事务这两个特性,以此来实现EOS(exactly oncesemantics,精确一次处理语义)。

二、幂等

所谓的幂等,简单地说就是对接口的多次调用所产生的结果和调用一次是一致的。生产者在进行重试的时候有可能会重复写入消息,而使用Kafka的幂等性功能之后就可以避免这种情况。

为了实现生产者的幂等性,Kafka为此引入了

  • producer id
  • 序列号(sequencenumber)这两个概念(在早期的kafka版本中,我们在业务层也实现了类似的header信息来实现业务层幂等。当然这部分kafka原生实现的源码也值得一看,对比看看有何优劣!)

客户端需要开启enable.idempotence为true。

为什么需要幂等?

因为事务开启后必然会有很多的失败重试

三、事务

幂等性并不能跨多个分区运作,而事务可以弥补这个缺陷。事务可以保证对多个分区写入操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能。

1、解决什么问题?

  • 多分区原子写入
  • consume-transform-produce 原子操作
  • 有状态应用多个会话之间的连续性如何理解: 如应用重启后可以接着幂等生产和事务恢复

2、producer和consumer

2.1 producer
  • producer和transactionalId建议一一对应,因为一个TransactionalId只能有一个活跃的producer
  • 应用实例die的情况下,新启动的实例会接管事务,继续未完成的事务(aborted或committed)
2.2 consumer

consumer 事务语义相对弱一些

  • topic是compacted,老数据可能被新版本的数据覆盖
  • 事务消息跨多个日志segment,可能其中部分segment过期被删除
  • consumer消费可以指定offset消费。10条数据原子一个事务原子写入,但你指定从第9条数据开始消费,则1~8条数据不会被消费到。
  • consumer不一定会同时消费所有一个事务生产的topic和分区

3、具体实现

3.1 关键概念

为了支持事务,kafka新增加了如下模块:

  • 新协调器:Transaction Coordinator,分配PID、管理事务
  • 新内部topic:__transaction_state 持久化存储事务的日志
  • 新消息类型: ControlBatch兼容老的ProduceBatch,写到用户的topic,告诉用于之前fetch的数据是否已经commit。详见jira。 之前线上有个案例case,旧版本客户端消费该数据导致故障。
  • 新id:transactionalId,唯一识别producer。旧producer挂了的情况下,同id的新producer继续未完成的事务。
  • 新的epoch:producer epoch。通过版本号确定合法的事务producer(一个TransactionalId只能有一个活跃的producer)。
  • 新的request类型、新的request版本号
  • 新的消息格式
3.2 案例与DataFlow
image.png
image.png

注意4.3和4.4是Consume-Transform-Produce场景独有的步骤,涉及到消费和生产的联动,这里我不多介绍。

步骤:

  • 1、查找TransactionCoordinator
  • 2、获取并保存PID
  • 3、Producer开启事务
  • 4、Consume-Transform-Produce(以该场景举例) - 4.1 AddPartitionsToTxnRequest:producer告诉事务协调器哪些tp会被写入,事务协调器会将这些信息存储,并启动一个事务的定时器。 - 4.2 ProduceRequest。 与常规的Produce相比多了PID\epoch\sequence number字段 - 4.3 AddOffsetCommitsToTxnRequest:批量消费和生产时使用到。 sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) - 4.4 TxnOffsetCommitRequest 与4.3配合
  • 5、提交或取消一个事务 - 5.1 EndTxnRequest:endTransaction \ abortTransaction abortTransaction 后 下游消费者读取后会丢弃掉对应的数据 收到这个请求后, 1)协调器将往事务日志里面写入PREPARE_COMMITPREPARE_ABORT消息。 2)发送COMMITABORTmarker到用户的数据里面(即5.2)。 3)事务日志写入COMMITTEDABORTED消息。 - 5.2 WriteTxnMarkerRequest事务协调器发给各个tp的leader的broker,broker收到后会往对应的topic里面插入ControlBatch(即marker)。告知该PID之前发送的消息的状态是怎么样的。注意:消费者需要缓存该PID的消息直到收到ControlBatch信息,才直到应该如何处理这些消息(业务处理or丢弃)。
  • 5.3 发送最终提交或取消请求ControlBatch(即marker)写到tp里后,事务协调器会往事务日志里面写入COMMITTEDABORTED消息,事务完成。

四、事务关键参数

1、broker端

  • transactional.id.timeout.ms:604800000 (7 days) 事务id在没有更新情况下存活的时间。
  • max.transaction.timeout.ms:900000 (15 min) 最大事务的超时时间。

2、producer端

  • enable.idempotence: 幂等默认会要求: acks=all retries > 1 max.inflight.requests.per.connection=1.
  • transaction.timeout.ms:事务超时时间,60000
  • transactional.id:事务id

3、consumer端

  • isolation.level:默认read_uncommitted
    • read_uncommitted: 同时消费到committed和uncommitted的消息
    • read_committed:只能消费到非事务消息以及

五、性能测试

不难分析,kafka的事务是相对轻量的,对性能影响相对可控。

kafka-producer-perf-test.sh--transactional-id 用于测试并发事务的性能。

潜在性能下降点:单producer写入性能,特别是开启幂等后的性能下降。

image.png
image.png

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • kafka事务
    • 一、基础概念
      • 二、幂等
        • 三、事务
          • 1、解决什么问题?
          • 2、producer和consumer
          • 3、具体实现
        • 四、事务关键参数
          • 1、broker端
          • 2、producer端
          • 3、consumer端
        • 五、性能测试
        相关产品与服务
        大数据处理套件 TBDS
        腾讯大数据处理套件(Tencent Big Data Suite,TBDS)依托腾讯多年海量数据处理经验,基于云原生技术和泛 Hadoop 生态开源技术提供的可靠、安全、易用的大数据处理平台。 TBDS可在公有云、私有云、非云化环境,根据不同数据处理需求组合合适的存算分析组件,包括 Hive、Spark、HBase、Flink、Presto、Iceberg、Elasticsearch、StarRocks 等,以快速构建企业级数据湖仓。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档