我正在尝试使用ReplyingKafkaTemplate
来实现sendAndReceive
消息/响应。
在执行该服务的服务中,我使用ProducerFactory
和ConcurrentKafkaListenerContainerFactory
的默认实例来创建ReplyingKafkaTemplate
的新实例,如下所示:
@Service
public class MyService {
private final ReplyingKafkaTemplate<String, MyCommand, MyResult> kafkaTemplate;
public MyService(ProducerFactory<String, MyCommand> pf,
ConcurrentKafkaListenerContainerFactory<String, MyResult> containerFactory) {
var container = containerFactory.createContainer("results topic name");
this.kafkaTemplate = new ReplyingKafkaTemplate<>(pf, container);
kafkaTemplate.start();
}
//... other methods omitted as irrelevant...
private Mono<MyResult> sendMessageAndWaitForResult(ProducerRecord<String, MyCommand> producerRecord) {
var requestReplyFuture = kafkaTemplate.sendAndReceive(producerRecord);
return Mono
.fromFuture(requestReplyFuture.completable())
.map(ConsumerRecord::value);
}
private ProducerRecord<String, MyCommand> createProducerRecord(Iterable<RecordHeader> recordHeaders, MyCommand myCommand) {
return new ProducerRecord(
"commands topic name",
null,
"some string id",
myCommand,
recordHeaders);
}
}
MyCommand
和MyResult
是在Lombok帮助下定义的POJO。
我所犯的错误是:
KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class MyCommand to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer.
我知道我没有定义序列化程序,但通过Kafka
的示例,它们只使用ReplyingKafkaTemplate<String, String, String>
示例,当然不需要设置序列化程序。但我做了却找不到在哪里怎么做?
谢谢!
编辑1:由于Garry的提示,我采用了为反序列化器创建Bean的方法,但是在遇到一些错误之后,我设法让它以这样的方式工作:
public DefaultKafkaProducerFactory<String, MyCommand> producerFactory(
KafkaProperties properties,
AvroConverter avroConverter) {
Map<String, Object> props = properties
.buildProducerProperties();
DefaultKafkaProducerFactory pf =
new DefaultKafkaProducerFactory(
props,
new StringSerializer(),
avroConverter.valueSerde().serializer());
return pf;
}
上面应该是一个@Bean,但是当我这样做时,我得到了下面的注释中提到的错误,一些Bean的依赖关系形成了一个循环。
所以这个producerFactory
是我在创建ReplyingKafkaTemplate
时传递的。
令人费解的是,我不必为反序列化MyResponse
创建相同的东西
AvroConverter
是一个包装SpecificAvroSerde
创建的类。
发布于 2021-07-15 14:04:39
我假设您正在使用Spring;默认情况下,它配置一个StringSerializer
。
org.apache.kafka.common.errors.SerializationException::发送失败;嵌套的异常是KafkaException不能将类MyCommand的值转换为在value.serializer中指定的类MyCommand。
由于您发送的是POJO,而不是字符串,所以必须使用与StringSerializer
不同的StringSerializer
,例如JsonSerializer
。
使用Spring,可以将序列化程序指定为属性。
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
也可以看到样品
https://github.com/spring-projects/spring-kafka/tree/main/samples
https://stackoverflow.com/questions/68392576
复制相似问题