Apache Kafka 消费者 API 详解 Apache Kafka 是一个高吞吐量、低延迟的分布式流处理平台,用于构建实时数据管道和流应用。...在 Kafka 中,消费者负责从 Kafka 集群中读取消息。本文将详细演示 Kafka 消费者 API 的使用,包括配置、消息消费、错误处理和性能优化等内容。 1....配置消费者 Kafka 消费者需要一系列配置参数才能正确运行。这些参数可以通过 Properties 对象进行设置。...总结 本文详细介绍了 Apache Kafka 消费者 API 的使用,包括配置、消息消费、偏移量管理、错误处理和性能优化。...通过理解和实践这些内容,可以帮助你更好地使用 Kafka 消费者进行高效、可靠的数据消费。 希望本文对你有所帮助,如有任何疑问或建议,欢迎留言讨论。
Consumer之自动提交 在上文中介绍了Producer API的使用,现在我们已经知道如何将消息通过API发送到Kafka中了,那么现在的生产者/消费者模型就还差一位扮演消费者的角色了。...因此,本文将介绍Consumer API的使用,使用API从Kafka中消费消息,让应用成为一个消费者角色。...0.0.1:9092"); // 指定group.id,Kafka中的消费者需要在消费者组里 props.setProperty(ConsumerConfig.GROUP_ID_CONFIG...若消费者处理数据失败时,只要不提交相应的offset,就可以在下一次重新进行消费。 和数据库的事务一样,Kafka消费者提交offset的方式也有两种,分别是自动提交和手动提交。...通过调用seek方法可以指定从哪个Partition的哪个offset位置进行消费,代码示例: /** * 手动控制offset的起始位置 */ public static void manualCommitOffsetWithPartition2
Kafka 消费者总共有 3 种 API,新版 API、旧版高级 API、旧版低级 API,新版 API 是在 kafka 0.9 版本后增加的,推荐使用新版 API,但由于旧版低级 API 可以对消息进行更加灵活的控制...,所有在实际开发中使用的也较多,本文讨论消费者旧版低级 API 的基本使用。...旧版低级 API 处理以下场景更为方便: 消息重复消费 添加事务管理机制,保证 Exactly Once 消费指定分区或者指定分区的某些片段 使用旧版低级 API的步骤: 获取你要读取的topic的partition...; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.ErrorMapping...旧版消费者API——低级API * @Author YangYunhe * @Date 2018-06-26 13:16:29 */ public class SimpleConsumerTest
如果没有很多可用数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。...,就看哪个条件先得到满足。...自定义的策略 重要性:中等 说明:PartitionAssignor 根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。...因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要使用了 Range 策略,而且分区数量无法被消费者数量整除,就会出现这种情况。...如果使用 RoundRobin 策略来给消费者 C1 和消费者 C2 分配分区,那么消费者 C1 将分到主题 T1 的分区 0 和分区 2 以及主题 T2 的分区 1,消费者 C2 将分配到主题 T1
,但是一个消息只能被一个消费者消费。...Kafka 和 RabbitMQ 都能满足如上的特性,那么我们应该如何选择使用哪一个?这两个 MQ 有什么差异性?在什么样的场景下适合使用 Kafka,什么场景下适合使用 RabbitMQ ?...请选择 Kafka,它能够保证发送到相同主题分区的所有消息都能够按照顺序处理。...请选择 Kafka,它能够给每个主题配置超时时间,只要没有达到超时时间的消息都会保留下来,请放心 Kafka 的性能不依赖于存储大小,理论上它存储消息几乎不会影响性能。...不过对于 Kafka 而言,也可以通过其他方式实现。 可伸缩行 如果你的需求场景是对伸缩方面、吞吐量方面有极大的要求。 请选择 Kafka。 小结 本文纯属抛砖引玉,有问题,欢迎批评指正。
关于消费组的概念在《图解Kafka中的基本概念》中介绍过了,消费组使得消费者的消费能力可横向扩展,这次再介绍一个新的概念“再均衡”,其意思是将分区的所属权进行重新分配,发生于消费者中有新的消费者加入或者有消费者宕机的时候...而为了应对消费者宕机情况,偏移量被设计成不存储在消费者的内存中,而是被持久化到一个Kafka的内部主题__consumer_offsets中,在Kafka中,将偏移量存储的操作称作提交。...因此我们可以组合使用两种提交方式。在轮循中使用异步提交,而当关闭消费者时,再通过同步提交来保证提交成功。...在使用消费者的代理中,我们可以看到poll方法是其中最为核心的方法,能够拉取到我们需要消费的消息。...所以接下来,我们一起深入到消费者API的幕后,看看在poll方法中,都发生了什么,其实现如下: public ConsumerRecords poll(final Duration timeout
消费者组的特点 ? 这是 kafka 集群的典型部署模式。 消费组保证了: 一个分区只可以被消费组中的一个消费者所消费 一个消费组中的一个消费者可以消费多个分区,例如 C1 消费了 P0, P3。...假设一个主题有10个分区,如果没有消费者组,只有一个消费者对这10个分区消费,他的压力肯定大。 ? 如果有了消费者组,组内的成员就可以分担这10个分区的压力,提高消费性能。...2.2 消费模式灵活 假设有4个消费者订阅一个主题,不同的组合方式就可以形成不同的消费模式。 ? 使用4个消费者组,每组里放一个消费者,利用分区在消费者组间共享的特性,就实现了广播(发布订阅)模式。...只使用一个消费者组,把4个消费者都放在一起,利用分区在组内成员间互斥的特性,就实现了单播(队列)模式。 2.3 故障容灾 如果只有一个消费者,出现故障后就比较麻烦了,但有了消费者组之后就方便多了。...消费组会对其成员进行管理,在有消费者加入或者退出后,消费者成员列表发生变化,消费组就会执行再平衡的操作。 例如一个消费者宕机后,之前分配给他的分区会重新分配给其他的消费者,实现消费者的故障容错。 ?
自动提交 最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.commit 被设为 true,那么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。...消费者每次获取新数据时都会先把上一次poll()方法返回的最大偏移量提交上去。...可能造成的问题:数据重复读 假设我们仍然使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。...这个时候可以使用异步提交,只管发送提交请求,无需等待 broker 的响应。...消费者 API 提供了再均衡监听器,以下程序可以做到 kafka 消费数据的 Exactly Once 语义: package com.bonc.rdpe.kafka110.consumer; import
有关详细信息,请参阅0.11.0中的Streams API更改。 升级0.10.1 Kafka Streams应用程序 将Streams应用程序从0.10.1升级到1.0不需要代理升级。...有几个API的变化,这是不向后兼容(参见流API中1.0.0的变化, 在0.11.0流API的变化, 在0.10.2流API的变化,和 流API的变化在0.10.1了解更多详情)。...有关详细信息,请参阅0.11.0中的Streams API更改。 升级0.10.1 Kafka Streams应用程序 将Streams应用程序从0.10.1升级到0.11.0不需要代理升级。...对于安全集群,事务API需要新的ACL,可以使用bin/kafka-acls.sh。工具。 Kafka中的EoS引入了新的请求API并修改了几个现有API。...用户应尽快将其代码迁移到kafka-clients JAR中包含的Java生产者。 新的消费者API已经标记为稳定。
优雅的退出消费者程序 package com.bonc.rdpe.kafka110.consumer; import java.util.Arrays; import java.util.Properties...,线程的数量受限于分区数,当消费者线程的数量大于分区数时,就有一部分消费线程一直处于空闲状态 多线程消费者的线程实现类代码如下: package com.bonc.rdpe.kafka110.thread...独立消费者 有时候你可能只需要一个消费者从一个主题的所有分区或者某个特定的分区读取数据。这个时候就不需要消费者群组和再均衡了,只需要把主题或者分区分配给消费者,然后开始读取消息并提交偏移量。...一个消费者可以订阅主题(并加入消费者群组),或者为自己分配分区,但不能同时做这两件事情。...以下是独立消费者的示例代码: package com.bonc.rdpe.kafka110.consumer; import java.util.ArrayList; import java.util.List
--kafka--> org.springframework.kafka spring-kafka xxx配置文件spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer...bootstrap-servers: 192.168.20.75:9907kafka: spark: task: topic: platform-model-spark-topic1...----------------"); producer.close(); }消费者==说明:① Topic主题用来区分不同类型的消息② GroupId用来解决同一个Topic主题下重复消费问题...,比如一条消费需要多个消费者接收到,就可以通过设置不同的GroupId实现,实际消息是存一份的,只是通过逻辑上设置标识来区分,系统会记录Topic主题下--》GroupId分组下--》partition
升级步骤.png 注意:由于引入了新的协议,要在升级客户端之前先升级kafka集群(即,0.10.1.x仅支持 0.10.1.x或更高版本的broker,但是0.10.1.x的broker向下支持旧版本的客户端...更改为0.10.1。...可以看到所有的消息收到正常; 接下来,把项目项目代码中的消费者更新到0.11.0.3,进行项目灰度发布,然后重新修改kafka配置文件中log.message.format.version=0.9.0.1...还可以使用DumpLogSegments工具,并替换您的目录位置/日志文件名称; 使用 ..../kafkalogs/risk_api_msg_test-2/00000000000000000000.log -print-data-log 查看topic下的数据属性: ?
Offset存储模型 由于一个partition只能固定的交给一个消费者组中的一个消费者消费,因此Kafka保存offset时并不直接为每个消费者保存,而是以 groupid-topic-partition...groupId.hashCode() % offsets.topic.num.partitions) 的方式来查询某个Consumer Group的offset信息保存在__consumers_offsets的哪个...另外几个与 Kafka Offset 管理相关的要点如下: Kafka 默认是定期帮你自动提交位移的(enable.auto.commit=true)。...kafka-0.10.1.X版本之前: auto.offset.reset 的值为smallest,和,largest....(offest保存在zk中); kafka-0.10.1.X版本之后: auto.offset.reset 的值更改为:earliest,latest,和none (offest保存在kafka的一个特殊的
auto.offset.reset 属性指定针对当前消费组,在分区没有提交偏移量或提交偏移量无效(可能是由于日志截断)的情况下,消费者应该从哪个偏移量开始读取。...'earliest' 同 'smallest' 'latest' 同 'largest' kafka-0.10.1.X 版本之前:auto.offset.reset 的值为smallest...和largest (offest保存在zk中) kafka-0.10.1.X版本之后:auto.offset.reset 的值更改为 earliest, latest (offest保存在kafka...asynchronous=True) finally: consumer.close() 本例在前面示例的基础上,将commit() 的asynchronous 参数改成True,消费者将使用异步提交发送请求并立即返回...API提供了一个callback,当提交成功或失败时会调用该callback。
使用sh脚本1)生产者./kafka-console-producer.sh --broker-list 192.168.20.91:9092 --topic test2)消费者..../kafka-console-consumer.sh --bootstrap-server 192.168.20.91:9092 --topic test --from-beginning
本文介绍了Apache Kafka,然后演示了如何使用MongoDB作为流数据的源(生产者)和目标(消费者)。...生产者选择一个主题来发送给定的事件,而消费者则选择他们从哪个主题中提取事件。例如,金融应用程序可以从一个主题中提取纽约证券交易所股票交易,并从另一个主题中提取公司财务公告,以寻找交易机会。...图1:Kafka生产者,消费者,主题和分区 MongoDB作为Kafka消费者的一个Java示例 为了将MongoDB作为Kafka消费者使用,接收到的事件必须先转换为BSON文档,然后再存储到数据库中...MongoDB的Kafka使用者 - MongoDBSimpleConsumer.java 请注意,此示例消费者是使用Kafka Simple Consumer API编写的 - 还有一个Kafka...高级消费者API,它隐藏了很多复杂性 - 包括管理偏移量。
以时间戳查询消息 (1) Kafka 新版消费者基于时间戳索引消费消息 kafka 在 0.10.1.1 版本增加了时间索引文件,因此我们可以根据时间戳来访问消息。...; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.PartitionInfo...说明:基于时间戳查询消息,consumer 订阅 topic 的方式必须是 Assign (2) Spark基于kafka时间戳索引读取数据并加载到RDD中 以下为一个通用的,spark读取kafka...KafkaUtils.createRDD[String, String](sc, kafkaParams, offsetRanges.toArray, PreferConsistent).map(_.value) } } 使用方法...消费速度控制 在有些场景可以需要暂停某些分区消费,达到一定条件再恢复对这些分区的消费,可以使用pause()方法暂停消费,resume()方法恢复消费,示例代码如下: package com.bonc.rdpe.kafka110
Kafka Connect简介 Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析)。...为何集成其他系统和解耦应用,经常使用Producer来发送消息到Broker,并使用Consumer来消费Broker中的消息。...使用Kafka自带的File连接器 图例 ?...-- https://mvnrepository.com/artifact/org.apache.kafka/connect-api --> ...org.apache.kafka connect-api <version
了解了什么是kafka( https://www.cnblogs.com/tree1123/p/11226880.html)以后 学习Kafka核心之消费者,kafka的消费者经过几次版本变化,特别容易混乱...,所以一定要搞清楚是哪个版本再研究。...1、low-level consumer low-level consumer底层实现是 SimpleConsumer 他可以自行管理消费者 Storm的Kafka插件 storm-kafka就是使用了...request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName....* KafkaConsumer 新版本的几个核心概念: consumer group 消费者使用一个消费者组名(group.id)来标记自己,topic的每条消息都只会发送到每个订阅他的消费者组的一个消费者实例上
如果未设置,则使用配置的partitioner key (可选) – 和消息对应的key,可用于决定消息发送到哪个分区。...参考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html 注:生产者代码是线程安全的,支持多线程,而消费者则不然...消费者 #-*- encoding:utf-8 -*- __author__ = 'shouke' from kafka import KafkaConsumer from kafka import...参考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html 客户端 #-*- encoding:utf-8...参考API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html brokers() 获取所有broker元数据