Apache Kafka是基于发布/订阅的容错消息系统,由Scala和Java编写,是一个分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力。
与其他消息传递系统相比,Kafka具有更好的吞吐量,内置分区,复制和固有的容错能力,这使得它非常适合大规模消息处理应用程序。
Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失。 Kafka构建在ZooKeeper同步服务之上。 它与Apache Storm和Spark非常好地集成,用于实时流式数据分析。
Message Queue消息传送系统提供传送服务。消息传送依赖于大量支持组件,这些组件负责处理连接服务、消息的路由和传送、持久性、安全性以及日志记录。消息服务器可以使用一个或多个代理实例。消息队列分为两种:点对点与发布/订阅(pub-sub)
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。
消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
该系统的典型示例是订单处理系统,其中每个订单将由一个订单处理器处理,但多个订单处理器也可以同时工作。
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。
现实生活的例子是电视,它发布不同的频道,如运动,电影,音乐等,任何人都可以订阅自己的频道集。
Topic物理上的分组,一个 topic可以分为多个 partition,每个 partition 是一个有序的队列。partition中的每条消息都会被分配一个有序的 id(offset)
- offset: 消息唯一标识, 对应类型long
- MessageSize: 对应类型int32
- data: message的具体内容。
kafka有比传统的消息系统更强的顺序保证。
传统的消息系统按顺序保存数据,如果多个消费者从队列消费,则服务器按存储的顺序发送消息,但是,尽管服务器按顺序发送,消息异步传递到消费者,因此消息可能乱序到达消费者。这意味着消息存在并行消费的情况,顺序就无法保证。消息系统常常通过仅设1个消费者来解决这个问题,但是这意味着没用到并行处理。
kafka做的更好。通过并行topic的parition —— kafka提供了顺序保证和负载均衡。每个partition仅由同一个消费者组中的一个消费者消费到。并确保消费者是该partition的唯一消费者,并按顺序消费数据。每个topic有多个分区,则需要对多个消费者做负载均衡,但请注意,相同的消费者组中不能有比分区更多的消费者,否则多出的消费者一直处于空等待,不会收到消息。
所有发布消息到消息队列和消费分离的系统,实际上都充当了一个存储系统(发布的消息先存储起来)。Kafka比别的系统的优势是它是一个非常高性能的存储系统。
写入到kafka的数据将写到磁盘并复制到集群中保证容错性。并允许生产者等待消息应答,直到消息完全写入。
kafka的磁盘结构 - 无论你服务器上有50KB或50TB,执行是相同的。
client来控制读取数据的位置。你还可以认为kafka是一种专用于高性能,低延迟,提交日志存储,复制,和传播特殊用途的分布式文件系统。
以下是详细说明:可以认为topic下有partition,partition下有segment,segment是实际的一个个文件,topic和partition都是抽象概念。
在目录/{topicName}-{partitionid}/下,存储着实际的log文件(即segment),还有对应的索引文件。
每个segment文件大小相等,文件名以这个segment中最小的offset命名,文件扩展名是.log;segment对应的索引的文件名字一样,扩展名是.index。有两个index文件,一个是offset index用于按offset去查message,一个是time index用于按照时间去查,其实这里可以优化合到一起,下面只说offset index。总体的组织是这样的:
为了减少索引文件的大小,降低空间使用,方便直接加载进内存中,这里的索引使用稀疏矩阵,不会每一个message都记录下具体位置,而是每隔一定的字节数,再建立一条索引。 索引包含两部分,分别是baseOffset,还有position。
查找offset对应的记录时,会先用二分法,找出对应的offset在哪个segment中,然后使用索引,在定位出offset在segment中的大概位置,再遍历查找message。
在kafka中,流处理持续获取输入topic
的数据,进行处理加工,然后写入输出topic
。
可以直接使用producer和consumer API进行简单的处理。对于复杂的转换,Kafka提供了更强大的Streams API。可构建聚合计算或连接流到一起的复杂应用程序。
助于解决此类应用面临的硬性问题:处理无序的数据,代码更改的再处理,执行状态计算等。
前面的博客Spark Structured Streaming + Kafka使用笔记有详细介绍Spark+Kafka的使用。
配置项 | 作用 |
---|---|
broker.id | broker的唯一标识 |
auto.create.topics.auto | 设置成true,就是遇到没有的topic自动创建topic。 |
log.dirs | log的目录数,目录里面放partition,当生成新的partition时,会挑目录里partition数最少的目录放。 |
配置项 | 作用 |
---|---|
num.partitions | 新建一个topic,会有几个partition。 |
log.retention.ms | 对应的还有minutes,hours的单位。日志保留时间,因为删除是文件维度而不是消息维度,看的是日志文件的mtime。 |
log.retention.bytes | partion最大的容量,超过就清理老的。注意这个是partion维度,就是说如果你的topic有8个partition,配置1G,那么平均分配下,topic理论最大值8G。 |
log.segment.bytes | 一个segment的大小。超过了就滚动。 |
log.segment.ms | 一个segment的打开时间,超过了就滚动。 |
message.max.bytes | message最大多大 |
Serializer
接口的实现类.Serializer
接口的实现类- `acks=0` 不需要等待服务器的确认. 这是`retries`设置无效. 响应里来自服务端的offset总是-1. producer只管发不管发送成功与否。延迟低,容易丢失数据。
- `acks=1` 表示leader写入成功(但是并没有刷新到磁盘)后即向producer响应。延迟中等,一旦leader副本挂了,就会丢失数据。
- `acks=all`等待数据完成副本的复制, 等同于`-1`. 假如需要保证消息不丢失, 需要使用该设置. 同时需要设置`unclean.leader.election.enable`为true, 保证当ISR列表为空时, 选择其他存活的副本作为新的leader.buffer.memory
producer可以使用的最大内存来缓存等待发送到server端的消息. 如果消息速度大于producer交付到server端的阻塞时间
none
, gzip
, snappy
和lz4
. 压缩整个batch的数据, 因此batch的效果对压缩率也有影响. 更多的批处理意味着更好的压缩retries
而没有将max.in.flight.request.per.connection
设置为1, 在两个batch发送到同一个partition时有可能打乱消息的发送顺序(第一个发送失败, 而第二个发送成功)batch.size
的同一partition的消息会立即发送, 不管linger.ms
的设置. 假如要发送的消息比较少, 会等待指定的时间以获取更多的消息.
默认设置为0 ms(没有延迟).KafkaProducer.send()
和KafkaProducer.partitionsFor()
的阻塞时间. 这些方法会因为buffer满了或者metadata不可用而阻塞. 用户设置在serializers或者partitioner中的阻塞不会计算在内.Partitioner
接口的实现类, 默认是org.apache.kafka.clients.producer.internals.DefaultPartitioner
. 需要处理数据倾斜等原因调整分区逻辑的时候使用.exactly-once
模式. 设置为’false’(默认值), producer会因为borker失败等原因重试发送, 可能会导致消息重复.
设置为’true’时需要结合max.in.flight.requests.per.connection
设为’1’和retires
不能为’0’, 同时acks
需要设置为’all’或者’’-1’.ProducerInterceptor
接口的实现类, 默认为null. 可以通过该接口的实现类去拦截(可能需要修改)producer要发送的消息在发送到服务端之前.retries
设置了的情况下会出现消息发送顺序错误.max.transaction.timeout.ms
会收到InvalidTransactionTimeout
错误.ConsumerRecords records
。
如果没有拿到足够多的数据,会阻塞1000ms,但不会超过1000ms就会返回。max. poll. interval
. ms需要设置稍微大于1分钟即可,但是session. timeout. ms
可以设置小一点(如10s),用于快速检测Consumer崩溃。max.poll.records
条数据需要在在session.timeout.ms
这个时间内处理完
默认值为500session.timeout.ms
这个时间fetch.min.bytes
时,等待消费端请求的最长等待时间扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有