一,KafkaConsumer使用要点解释
1,基本介绍
该客户端用户透明的处理kafka Broker的失败,透明的适应topic在集群中的迁移。这种客户端也可以使用消费者组的概念与kafka cluster进行交互,来进行均衡消费负载。
消费者维护着到必要的Broker上的TCP链接,用以获取data。使用之后未关闭消费者的话会导致链接泄漏。该消费者不是线程安全的,具体详见下文的多线程版本。
2,跨版本的兼容性
该版本的适用于kafka0.10+版本。老版本或者过新的版本会导致一些特征失效。比如,0.10.0版本不支持offsetsForTimes,因为该版本是在0.10.1加入的。假如运行的kafka Broker版本与你客户端使用的版本不一致,你使用了Broker不支持的API,就会抛出异常UnsupportedVersionException。
3,offsets 和 Consumer position
Kafka每个分区内都会为每个Record维护一个数字的offset记录。在那个分区内,该偏移是Record的唯一的标识符,也可以指出消费者在一个分区内的消费位置。举例说明, 比如一个消费者的Consumer position是5,已经消费了offset为0-4的Record,那么下次消费的起始位置就是5.
消费者的位置给出了下一个记录的偏移量。消费者在一个分区中位置比已消费的Record的偏移大一。每次调用poll的时候都会自动增加。
Committed positon是上一个已经安全提交的偏移。进程失败或者重启,消费者恢复后可以使用该offset。消费者既可以每个一段时间自动提交偏移,也可以通过手动调用commitSync 和commitAsync来提交偏移。
4,消费者组和topic订阅
Kafka通过使用消费者组的概念,运行通过线程池来分摊消费和处理的工作。这些线程既可以运行在同一台机器上,也可以分布在多台机器上运行,以实现处理的容灾。使用相同group.id的消费者,同属一个消费者组。
消费者组中的每一个消费者都能动态设置它能够订阅的topic列表。Kafka会将改topic和parition中消息传递给订阅该topic和partition的消费者。这是通过在消费者组中平衡分区分配来实现的,这使得每个分区仅仅被分配给消费者组的一个消费者。比如,一个topic有四个分区,一个消费者组有两个消费者,那么每个消费者将会拥有两个分区。
消费者组和分区的关系是动态的。这时候有一个消费者组再平衡的策略。总共有下面四种情况会触发消费者组再平衡:
1),同一个组内有新的消费者加入
2),同一个组内有消费者实例退出
3),已订阅的topic有新的分区增加
4),正则的方式订阅的topic,有新增满足正则的。
消费者组检测新增分区是通过定时刷新元数据来实现的,然后会将它们分配给组员。
我们也可以通过手动指定消费topic和分区给消费者的方式来实现消费(下面会有详细的介绍)。但是这种手动指定消费分区的方式,就会是消费者丧失动态分区分配和消费者组协作的特性。
5,检测消费者失败
在订阅一些topic,消费者调用poll(long)时会自动加入消费者组。poll方法设计的目标是确保消费者存活。只要你持续调用poll,该消费者就会存活在消费者组内,从分配给他的分区内消费消息。深层意思是,消费者发送周期性心跳给服务端。如果消费者宕机或者不能在超时时间(session.timeout.ms)内发送心跳,那么消费者就会被认为死掉了,它的分区会被重新分配给其它分区。
也会发生存活锁这种情况,意思是能够持续发送心跳,但是没有进行处理。为了避免在这种情况下消费者一直占用该分区,kafka提供了一个存活检测机制,可以使用max.poll.interval.ms配置。假如你不能在最大间隔内调用poll,消费者会主动离开消费者组,所以它占用的分区就会分配给其它消费者。这种情况下你会发现消费者抛出提交失败异常。这是一种安全机制,表名只有活跃的消费者才能消费消息提交消费偏移。所以,消费者要想一直在消费者组中存在必须持续调用poll。
消费者提供两种配置,控制poll:
1),max.poll.interval.ms:通过增加两次poll调用的时间间隔,可以给你的消费者更多的处理从poll返回的Records的时间。增加该值的缺点是会导致group rebalance延迟,因为消费者只会在调用poll的时候参与再平衡。
2),max.poll.records:每次调用poll返回的Records最大数目。这使得预测每次poll调用间隔内消费者能处理的最大消息数。通过调优该值,我们可以减少poll的调用间隔,来减少对消费者组再平衡的影响。
对于消息的处理时间多变的情况,所有的这些选项都要配置充足。建议是这种方式下将消息处理交给另外一个线程,这使得消息处理期间,消费者也可以调用poll。但是,必须要确保提交的偏移不超过实际的消费者位置。典型的处理是,禁止自动偏移提交,手动在线程已经处理结束后提交偏移(取决于你需要的消息传输语义).在消息处理结束前,你需要暂停消费,使得没有新的消息被消费。
二,自动提交偏移的案例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
通过配置bootstrap.servers指定一个Broker 列表,生产者从中随机选取一个server链接。也可以为了扩展填写几个不存在的server地址。
通过设置enable.auto.commit,可以实现后台按照一定周期自动提交偏移,自动提交的周期由 auto.commit.interval.ms指定。
上面的demo消费者订阅了两个topic,foo和bar,消费者组名有group.id指定。
三,手动提交偏移的案例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
除了消费者周期性的自动提交偏移,用户也可以自己控制提交偏移(等消费的消息处理结束后通过调用提交偏移的API来时先偏移的提交)。当消息的消费与某些处理逻辑耦合时,这是非常有用的,因此,在完成处理之前,不应该将消息视为消费掉的。
上面的例子中,我们会先消费一定的消息,缓存于内存中。当我们缓存了足够的消息后,我们会将它们插入数据库。如果像前面一样允许偏移自动提交的话,再次调用poll的时候,前面的消息记录就会被视为已经消费。在缓存数据之后,提交数据到数据库之前,我们的程序存在失败的可能,这就意味着会丢失数据。
为了避免这种情况,我们在相关消息记录已经被插入数据库后,手动提交偏移。这也会带来一个问题就是,在提交消费者消费偏移之前,数据插入数据库之后,我们的程序有可能失败,这时候会导致数据重复插入到数据库。这种使用方法实际上是kafka提供了至少消费一次的消息传递语义。正常情况下消息传递一次,失败的情况下消息可能消费多次。
上面的例子中使用 commitSync方式提交偏移。在某些情况下,在处理结束分区的消费记录之后,你可能希望我们提交自己指定的消费偏移。
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
值的注意的是,提交的偏移通畅是指,我们下次要消费的起始位置。所以,当调用commitSync(offsets)时,你应该在你上次消费的位置上加一。
四,手动进行分区分配
前面的例子主要是我们订阅感兴趣的kafka的topic,然后让kafka动态的将分区分配给同一个组内存活的Consumer。有些情况下你可能需要自己控制分区的分配。举例说明:
1,如果你的进程维护的有Partition相关的本地状态,那么这时候你就需要根据本地存储的状态获取相应的分区上的数据。
2,如果进程自身有高可用并且会在失败后自动重启(比如它使用了集群管理器yarn,mesos或者AWS,或者流式处理的一部分)。这种情况下,kafka去检测失败是不必要的,因为消费者进程会自动在其它机器上启动。
在这种模式下,可以使用assign(Collection)代替subscribe,来指定分区进行消费。
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
一旦分配,就可以像前面的例子一样,通过循环调用poll来获取数据。消费者组依然是可用的,但是分配的分区不会改变了,除非再次调用assign。手动指定分区并没有使用组管理器,所以消费者失败不会引起分区的再分配。即使消费者组内的所有消费者共享groupid,但是每个消费者之间是相互独立的。为了避免重复提交offset,你必须确保消费同一个分区的消费者实例都有一个不同的groupid。
五,自定义偏移存储位置
消费者应用程序并不需要用kafka作为消费偏移的存储位置,他可以选择将偏移存储到自己需要的任何存储位置。首要的使用案例是允许应用程序将消费偏移和结果同时存储到同一个存储系统中,存储过程是在一个原子操作中完成。假如能实现,这就实现了仅消费一次的消息消费语义,比kafka默认的至少一次消费语义更强。
有以下使用案例:
1,如果消费处理的结果是存储在传统数据库里,假如同时保存offset到数据库的话。同时保存偏移和结果将可以在一次事务中实现。假如事务操作成功那么结果会被存储,同时offet也会更新,假如失败结果不会被存储,偏移也不会被更新。
2,如果结果存储在本地存储中,则也可以存储偏移量。例如一个搜索索引的构建可以通过订阅特定的分区和同时存储offset和索引数据。如果这个操作是原子的,即使出现了数据未同步宕机丢失,剩余的索引还是有它相应的offset。这就意味着可以重建。
每一条Record都是带有它的offset的,所以,为了自己管理偏移只需要实现一下几点:
1,配置enable.auto.commit=false
2,使用ConsumerRecord所带的偏移去保存你自己的偏移位置。
3,重启的时候只需要通过seek(TopicPartition, long)重新定位你的偏移。
六,控制消费者的位置
在很多使用案例中,消费者仅仅只是从头到尾的消费消息,周期性的提交消费位置信息。然而,kafka允许消费者控制自己消费的位置,随意的向前向后移动其消费位置。意味着消费者可以重新消费历史的Records,或者跳过一些最近的Records。
在很多场景下消费者自己控制偏移是很有用的。
一个使用案例是时间敏感的消息记录,处理历史记录会使消费者严重滞后,所以这种情况下不是让消费者通过处理完历史的所有消息来跟上消息最新偏移,而是跳过历史的数据。
另一个使用案例是针对一些会保存本地状态的系统。消费者希望在启动的时候从本地状态中恢复。但是如果本地状态被损坏了(磁盘丢失),状态可能需要通过重新消费所有的数据来重建,前提是kafka保留了足够的历史数据。
七,消费流量控制
如果一个消费者被分配了多个分区,那么他会尽力同时消费所有分区,也即给所有的分区相同的优先权。但是有些情况下,消费者或许想聚焦从这些分区的一些子集中全速获取数据,等到这些子集数据变少或者没有数据可消费时才开始从其它分区获取数据。
一个使用场景就是流式处理,处理器同时从两个topic消费数据,然后将两个消费者的数据进行join。当一个topic的数据严重滞后于另一个的时候,流处理器肯定是需要暂停数据超前的topic,等待数据滞后的topic数据跟上来。另一个场景是消费者启动的时候发现有很多历史数据需要消费,而应用程序在消费其它topic之前需要消费部分topic最新的消息。
Kafka支持动态的控制消费流,通过使用pause(Collection)可以暂停消费指定的分区,通过使用 resume(Collection)可以重新开始消费指定的分区。
八,读事务性消息
事务是在kafka0.11版本以后引入,也即应用程序可以原子的将消息写入多个topic和分区。为了实现这个,消费者必须配置为只允许读取已经事务提交成功的消息。可以在消费者的配置中通过设置isolation.level=read_cimmitted来实现。
在read_committed模式下,消费者仅仅会读取一次事务成功提交的消息。非事务提交的消息,像正常模式一样读取。在read_committed模式下,是没有客户端的缓存的。在这模式下,消费者消费的最后一个偏移实际上就是一次事务提交的消息块的第一个消息的偏移。这个偏移被叫做“Last Stable Offset”(LSO)。
一个read_committed消费者会读取消息到LSO位置,然后会过滤掉事务提交失败的消息。对于一个read_committed消费者来说,LSO也会影响seekToEnd(Collection)和endOffsets(Collection),细节可以去看每个函数的介绍文档。消费滞后指标也是跟LSO相关的。
含有事务提交消息的分区会包含事务提交成功或者失败的标记。这个标记不需要返回给应用程序,仅仅在log文件里存在一个偏移。就会导致,应用程序从带有事务消息的topic获取数据的时候会看到消费偏移存在不连续的情况。这些缺失的消息可能是事务的标记,会为消费者过滤掉。另外,应用程序用read_committed消费者消费者偏移不连续,也可能是失败的事务导致的,虽然消息不会给消费者,但是会占用有效的偏移。
九,多线程处理
KafkaConsumer并不是线程安全的。意味着用户想多线程使用kafkaConsumer必须实现同步访问。非同步访问会导致ConcurrentModificationException异常。
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records = consumer.poll(10000);
// Handle new records
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
但一个单独线程中,消费者可以被关闭:
closed.set(true);
consumer.wakeup();
我们试图避免实现特定的处理线程模型,给我们留下了多种选择。
1,每个消费者一个线程
一个简单的实现就是每个线程都有它自己的消费者实例。下面列一下这种实现方式的优缺点:
1),优点:实现起来简单。
2),优点:没有内部线程协作,使得处理速度快。
3),优点:针对单个分区的顺序消费实现起来简单。
4),缺点:多消费者,意味着需要更多的到kafka cluster的TCP链接。通常情况下,kafka处理多链接是非常高效的<请阅读此文,查看原因>,所以这个开销小。
5),缺点:多个消费者意味着需要发送更多的Requests到Driver,批处理减少,次数增加,降低IO性能。
6),缺点:处理线程的总数会受限于分区的总数。
2,消费和处理分离
另一个选择是使用一个或者多个消费者消费消息,然后将消费的消息加入阻塞队列,在构建一个或者多个处理线程从阻塞队列里取消息,实现消息的真正处理。这种方式同样也有优点和缺点。
1),优点:这种方式允许用户可以灵活独立的扩展消费者和处理线程的数目。可以实现单线程消费,多线程处理。
2),缺点:无法保证分区内消息处理的顺序。
3),缺点:手动提交偏移变得困难,因为需要所有的线程协调工作,保证消费的每个分区的消息处理完毕,才能提交分区的偏移。
这种方式可以进行简单变化。比如,每个处理程序拥有自己的消息队列,消费者可以通过TopicPartition的hash值将消息加入不同的消息队列,然后简化偏移的提交。