我正在处理一个Kafka应用程序,我在弄清楚如何使聚合工作时遇到了一些困难。我有一个KStream bankTransactions,其中键是String类型的,值是JsonNode类型的,所以我用
// Definition of the different Serdes usedKTable<String, Long>中,其中键将是相同的,但是值将是从Json提取的</em
我试图使用Kafka流对CDC数据执行KTable-KTable外键连接。我将要读取的数据是Avro格式的,但是它是以与其他行业序列化程序/反序列化器(例如)不兼容的方式序列化的。因为架构标识符存储在标头中。当我设置我的KTables‘Serdes时,我的Kafka应用程序最初运行,但最终失败了,因为它在内部用byte[] serialize(String topic, T data);调用序列化器方法,<
定义了一个自定义存储,在自定义Transformer中使用(参考如下)。不确定,为什么内部主题"test_01-HOUSE-changelog“是用单个分区和单个复制创建的,而不是源分区"test”中的2个分区。这里遗漏了什么?(AbstractTask.java:81)
at org.apache.kafka.streams.processor.internals.StreamT