首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >ReplyingKafkaTemplate - sendAndReceive序列化问题

ReplyingKafkaTemplate - sendAndReceive序列化问题
EN

Stack Overflow用户
提问于 2021-07-15 10:59:20
回答 1查看 222关注 0票数 0

我正在尝试使用ReplyingKafkaTemplate来实现sendAndReceive消息/响应。

在执行该服务的服务中,我使用ProducerFactoryConcurrentKafkaListenerContainerFactory的默认实例来创建ReplyingKafkaTemplate的新实例,如下所示:

代码语言:javascript
运行
复制
@Service
public class MyService {

  private final ReplyingKafkaTemplate<String, MyCommand, MyResult> kafkaTemplate;

  public MyService(ProducerFactory<String, MyCommand> pf,
                    ConcurrentKafkaListenerContainerFactory<String, MyResult> containerFactory) {
    var container = containerFactory.createContainer("results topic name");
    this.kafkaTemplate = new ReplyingKafkaTemplate<>(pf, container);
    kafkaTemplate.start();
  }

   //... other methods omitted as irrelevant...

  private Mono<MyResult> sendMessageAndWaitForResult(ProducerRecord<String, MyCommand> producerRecord) {
    var requestReplyFuture = kafkaTemplate.sendAndReceive(producerRecord);
    return Mono
      .fromFuture(requestReplyFuture.completable())
      .map(ConsumerRecord::value);
  }

  private ProducerRecord<String, MyCommand> createProducerRecord(Iterable<RecordHeader> recordHeaders, MyCommand myCommand) {
        return new ProducerRecord(
                "commands topic name",
                null,
                "some string id",
                myCommand,
                recordHeaders);
    }
}

MyCommandMyResult是在Lombok帮助下定义的POJO。

我所犯的错误是:

代码语言:javascript
运行
复制
KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class MyCommand to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer.

我知道我没有定义序列化程序,但通过Kafka的示例,它们只使用ReplyingKafkaTemplate<String, String, String>示例,当然不需要设置序列化程序。但我做了却找不到在哪里怎么做?

谢谢!

编辑1:由于Garry的提示,我采用了为反序列化器创建Bean的方法,但是在遇到一些错误之后,我设法让它以这样的方式工作:

代码语言:javascript
运行
复制
public DefaultKafkaProducerFactory<String, MyCommand> producerFactory(
  KafkaProperties properties,
  AvroConverter avroConverter) {
        Map<String, Object> props = properties
  .buildProducerProperties();
        DefaultKafkaProducerFactory pf = 
  new DefaultKafkaProducerFactory(
    props,                                                                 
    new StringSerializer(),
    avroConverter.valueSerde().serializer());
        return pf;
}

上面应该是一个@Bean,但是当我这样做时,我得到了下面的注释中提到的错误,一些Bean的依赖关系形成了一个循环。

所以这个producerFactory是我在创建ReplyingKafkaTemplate时传递的。

令人费解的是,我不必为反序列化MyResponse创建相同的东西

AvroConverter是一个包装SpecificAvroSerde创建的类。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-07-15 14:04:39

我假设您正在使用Spring;默认情况下,它配置一个StringSerializer

org.apache.kafka.common.errors.SerializationException::发送失败;嵌套的异常是KafkaException不能将类MyCommand的值转换为在value.serializer中指定的类MyCommand。

由于您发送的是POJO,而不是字符串,所以必须使用与StringSerializer不同的StringSerializer,例如JsonSerializer

序列化、反序列化和消息转换

使用Spring,可以将序列化程序指定为属性。

https://docs.spring.io/spring-boot/docs/current/reference/html/application-properties.html#application-properties.integration.spring.kafka.producer.value-serializer

代码语言:javascript
运行
复制
spring:
  kafka:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

也可以看到样品

https://github.com/spring-projects/spring-kafka/tree/main/samples

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68392576

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档