我使用Kafka Rest代理来生成和使用Azure APIM从Kafka主题消费,因此使用Rest代理的原因。我可以使用auto.offset.reset =最早的线性方式使用消息,所以每次调用Kafka Rest都会给我最新的消息,这是很棒的,
在以下情况下:
A1 is data requestor
B2 is API that fulfills A1's request using REST proxy for Kafka
C3 is Kafka Rest proxy server
如果A1向B2请求数据,而B2使用卡夫卡主题C3提供的消息给A1,但由于某些互联网问题,该消息无法发送给A1。因此,现在A1将不得不提出另一个请求来获取丢失的消息,但是在Kafka代理中,偏移量已经增加了。因此,如果B2再次消耗数据,那么它将是新的数据,而不是丢失的数据。如何在Kafka Rest代理中使用偏移量来读取旧消息?
发布于 2022-10-18 04:26:59
只有当一个组不存在auto.offset.reset = earliest
时,才会应用。它不影响以下任何请求。这也是这个话题的开始,而不是“最新”。
您需要为一个组提交偏移量。例如。
POST /consumers/(string:group_name)/instances/(string:instance)/offsets
Content-Type: application/vnd.kafka.v2+json
{
"offsets": [
{
"topic": "test",
"partition": 0,
"offset": 20
},
{
"topic": "test",
"partition": 1,
"offset": 30
}
]
}
然后下一个消费者记录批次将开始从这些抵消中消费。
https://stackoverflow.com/questions/73061017
复制相似问题