我有一个Kafka读数流,我会检查是否超过了某个阈值。我只想在第一次超过警报时传播警报。为此,我首先计算新状态,将新状态分组到KGroupedStream中。然后减少到一个KTable,我在这里检查状态是否发生了变化(保留一个布尔值),并更改为changelog流,并对状态发生变化的记录进行过滤。我的理论是,这应该是可行的,但是并不是每个状态更改都会传播到changelog流,而只是偶尔更新一次changelog流(无法真正看到模式)。有人知道为什么会这样吗,或者更好的是我如何解决这个问题?简化示例:
K