在这篇文章中,我们将探讨Apache Kafka中关于消息顺序的挑战和解决方案。在分布式系统中,按正确顺序处理消息对于维护数据的完整性和一致性至关重要。虽然Kafka提供了维护消息顺序的机制,但在分布式环境中实现这一点有其自身的复杂性。
Kafka通过为每条消息分配一个唯一的偏移量来在单个分区内保持顺序。这保证了在该分区内消息的顺序追加。然而,当我们扩展并使用多个分区时,保持全局顺序就变得复杂了。不同的分区以不同的速率接收消息,这使得跨分区的严格排序变得复杂。
让我们谈谈Kafka如何处理消息的顺序。生产者发送消息的顺序和消费者接收它们的顺序之间有一些差异。通过坚持使用一个分区,我们可以按它们到达代理的顺序处理消息。然而,这种顺序可能与我们最初发送它们的顺序不匹配。这种混乱可能发生的原因包括网络延迟或如果我们正在重发消息。为了保持一致性,我们可以实施具有确认和重试的生产者。这样,我们确保消息不仅到达Kafka,而且以正确的顺序到达。
这种跨分区的分布,虽然对可扩展性和容错性有益,但引入了实现全局消息顺序的复杂性。例如,我们按顺序发送两条消息,M1和M2。Kafka就像我们发送的那样接收它们,但是将它们放在不同的分区中。这里的问题是,仅仅因为M1首先发送,并不意味着它将在M2之前被处理。这在处理顺序至关重要的情况下可能具有挑战性,例如金融交易。
我们创建了名为'single_partition_topic'的主题,它有一个分区,以及名为'multi_partition_topic'的主题,它有5个分区。下面是一个具有单个分区的主题的示例,生产者正在向该主题发送消息:
UserEvent 是一个实现了 Comparable 接口的 POJO 类,有助于按 globalSequenceNumber(外部序列号)对消息类进行排序。由于生产者正在发送 POJO 消息对象,我们实现了自定义的 Jackson 序列化器和反序列化器。
分区 0 接收所有用户事件,事件 ID 以以下顺序出现:
在 Kafka 中,每个消费者组作为一个独立的实体操作。如果两个消费者属于不同的消费者组,它们都将接收主题上的所有消息。这是因为 Kafka将每个消费者组视为单独的订阅者。
如果两个消费者属于同一个消费者组并订阅了一个有多个分区的主题,Kafka将确保 每个消费者从一组唯一的分区中读取。这是为了允许消息的同时处理。
Kafka 确保在消费者组内,没有两个消费者读取相同的消息,因此每个消息在每个组中只被处理一次。
下面的代码是同一个消费者从同一个主题消费消息的示例:
在这种情况下,我们得到的输出显示消费者以相同的顺序消费消息,以下是输出中的顺序事件 ID:
对于具有多个分区的主题,消费者和生产者的配置是相同的。唯一的区别是消息去往的主题和分区,生产者向主题 'multi_partition_topic' 发送消息:
消费者从同一个主题消费消息:
生产者的输出列出了事件 ID 及其相应的分区,如下所示:
对于消费者,输出将显示消费者不是以相同的顺序消费消息。输出中的事件 ID 如下:
我们可以在 Kafka 中使用单个分区,正如我们之前用 'single_partition_topic' 的示例所示,这确保了消息的顺序。然而,这种方法有其权衡:
本质上,单个分区保证了顺序,但代价是减少了吞吐量。
在这种方法中,生产者为每条消息标记一个全局序列号。多个消费者实例并发地从不同分区消费消息,并使用这些序列号重新排序消息,以确保全局顺序。
在具有多个生产者的现实场景中,我们将通过所有生产者进程都可以访问的共享资源来管理全局序列,例如数据库序列或分布式计数器。这确保了序列号在所有消息中是唯一和有序的,无论哪个生产者发送它们:
在消费者端,我们将消息分组到时间窗口中,然后按顺序处理它们。我们在特定时间框架内到达的消息将其批量在一起,一旦窗口到期,我们处理该批次。这确保了在该时间框架内的有序处理,即使它们在窗口内的到达时间不同。消费者根据序列号缓冲消息并在处理前重新排序。我们需要确保消息按正确顺序处理,为此,消费者应该有一个缓冲期,在处理缓冲消息之前多次轮询消息,并且这个缓冲期足够长,以应对潜在的消息排序问题:
每个事件 ID 在输出中与其相应的分区一起显示,如下所示:
消费者输出带有全局序列号和事件 ID:
在这种方法中,每个消费者实例缓冲消息,并根据其序列号按顺序处理它们。然而,有一些考虑因素:
Kafka 的幂等生产者功能旨在精确地传递消息一次,从而防止任何重复。这在生产者可能因网络错误或其他瞬时故障而重试发送消息的情况下至关重要。幂等性的主要目标是防止消息重复,但它间接地影响了消息顺序。Kafka 使用两件事来实现幂等性:生产者 ID(PID)和作为幂等性键的序列号,该序列号在特定分区的上下文中是唯一的。
Kafka 通过按生产顺序将消息写入分区来保证消息顺序,感谢序列号,并通过 PID 和幂等性功能防止重复。 要启用幂等生产者,我们需要在生产者的配置中将 “enable.idempotence” 属性设置为 true:
有一些关键的 Kafka 生产者和消费者配置可以影响消息顺序和吞吐量。
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
props.put(ProducerConfig.LINGER_MS_CONFIG, "5");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
在这篇文章中,我们深入探讨了 Kafka 中消息排序的复杂性。我们探讨了挑战并提出了解决策略。无论是通过单分区、外部排序与时间窗口缓冲,还是幂等生产者,Kafka 提供了定制化的解决方案来满足消息排序的需求。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。