我使用Fluentd将日志转发到Kafka,然后将它们存储在Cassandra中以备后用。为此,我使用kafka-cassandra接收器连接器。如何做到这一点?topics=test_AF1
connect.cassandra.kcql=INSERT INTO test_event1 SELECT now() as id, messageas msg FROM test_AF1 TIMESTAMP=sys_tim
specify the format of data in Kafka and how to translate it into Connect data.from or stored into Kafkavalue.converter==org.apache.kafka.connect.json.JsonConverter
internal.value
是否可以在不对getLatestSourceOffset()进行额外检查的情况下修复它,比如添加字段以确定它是否是第一次轮询?或者没有办法避免它,我们应该增加检查?at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:309)
at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorag