Coordinator
的角色,.Coordinator
除了可以来协调消费的group
作balance外, 还接受 OffsetCommit Request, 用来存储消费的offset到Kafka本身中.具体可参考Kafka的消息是如何被消费的?;/consumers/[consumer group]/offsets/[topic]/[partition]
Timestamp
, 具体可参考Add a time based log index, 这样就可以方便的根据时间来获取并回滚相应的消费啦,真是太方便了;auto.offset.reset
1.1 smallest : 自动重置到最小的offset, 这个最小的offset不一定是0, 因为msg可能会被过期删除掉;
1.2 largest : 自动重置到最大的offset;kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list [broker list] --topic [topic name] --time [-1:获取最新offset, -2:获取最旧offset]
1.2 也可以通过代码来获取, 可以使用librdkafka的rd_kafka_query_watermark_offsets
函数来获取;rd_kafka_list_groups
来获取当前消费 gropu的详情;
2.1 使用rd_kafka_topic_partition_list_set_offset
来设置需要重置的partiton的offset;
2.2 调用rd_kafka_subscribe
和rd_kafka_consumer_poll
来等待group完成balance;
2.3 调用rd_kafka_commit
来完成重置的offset的提交;seek
接口,也可以用来设置offset;