

#kafka
spring.kafka.bootstrap-servers=10.11.114.247:9092
spring.kafka.producer.acks=1
spring.kafka.producer.retries=3
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.group-id=zfprocessor_group
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.artisan.common.entity.messages
spring.kafka.consumer.max-poll-records=500
spring.kafka.consumer.fetch-min-size=10
spring.kafka.consumer.fetch-max-wait=10000ms
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.listener.type=batch
spring.kafka.listener.ack-mode=manual
logging.level.org.springframework.kafka=ERROR
logging.level.org.apache.kafka=ERROR我们看看消费者反序列化,解析value的配置
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializerspring kafka 使用Jackson序列化, 如果存入kafka中的对象 包含 泛型,那么 默认情况下,这个泛型对象会被Jackson反序列为 LinkedHashMap . 抛出类型转换异常…
在实体类上增加如下注解
@Data
public class Message<T> {
private int messageCode;
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS,include = JsonTypeInfo.As.PROPERTY,property = "@class")
private T messageContent;
}反序列化后,多了个节点

Jackson JSON - Using @JsonTypeInfo annotation to handle polymorphic types