kafka生产者客户端的数据发送流程分为三个阶段:
下面我们从源码角度来看一下这个过程:
KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors<K, V> interceptors,
Time time) {
...
//记录累加器
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
....
//数据发送线程
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
....
}
public class RecordAccumulator {
...
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs) throws InterruptedException {
...
try {
//获取已有的缓冲区,或者创建新的缓冲区(Deque)
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
//锁住该缓冲区,避免用户异步编程操作导致数据发送数据顺序错乱的问题
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
//tryAppend方法将一条消息数据的时间戳、key、value、
//header等信息追加到缓冲区中(Deque<ProducerBatch> )
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null)
return appendResult;
}
...
synchronized (dq) {
...
//再次尝试,查看是否能将消息成功追加到Deque的某个批次中
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null) {
return appendResult;
}
//如果追加失败,那么创建一个新的批次,加入Deque尾部
...
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
...
dq.addLast(batch);
...
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
}
}
tryAppend方法详细分析:
添加失败有两种情况: 当前Deque为空,或者当前批次已满
在Kafka Producer中,每个ProducerBatch都对应一个Broker分区,该方法的作用是向ProducerBatch批次中尝试添加一条消息,如果该批次已满或无法再分配分区,则会创建一个新的ProducerBatch,并将消息添加到其中。通过使用一个生产者批次来批量发送多条消息,可以提高消息发送的效率和吞吐量,并减少网络IO的消耗。
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers){
...
//消息属于哪个主题
this.topic = topic;
//消息属于哪个分区
this.partition = partition;
//消息的key
this.key = key;
//消息的value
this.value = value;
//消息的时间戳
this.timestamp = timestamp;
//消息的消息头
this.headers = new RecordHeaders(headers);
}
注意:
KafkaProducer会将消息先放入缓冲区中,然后由单独的sender线程异步发送到broker服务端,那么既然消息是批量发送的,那么触发批量发送的条件是什么呢?
注意:
buffer.memory: 用来约束Kafka Producer能够使用的内存缓冲的大小的,默认为32MB。
注意:
生产者数据发送流程如下:
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
//调用拦截器对record进行预处理,该方法不会抛出异常
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
//发送消息
return doSend(interceptedRecord, callback);
}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
//1.检测生产者是否已经关闭
throwIfProducerClosed();
//2、检查正要将数据发往的主题在kafka集群中的包含哪些分区
//获取集群中一些元数据信息
long nowMs = time.milliseconds();
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
} catch (KafkaException e) {
...
}
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
//3.对消息的key进行序列化
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
...
}
byte[] serializedValue;
try {
//4.对消息的value进行序列化
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
...
}
//5.分区器进行计算,决定此条消息发送到哪个分区
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
...
//6.预估消息发送消息的大小,内容包括key,value以及消息头
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
//7.检查发送消息的大小是否超过阈值
ensureValidRecordSize(serializedSize);
...
// 拦截器回调函数
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
//8.将消息添加到消息累加器(缓冲区)
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
...
//9.如果添加进缓冲队列已经满了,或者是首次创建的,那么幻想sender线程进行数据发送
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
//10.返回future对象
return result.future;
}
...
}
dosend方法总结(上面源码省略了很多内容,大家结合具体源码来查看):
总的来说,该方法实现了Kafka Producer发送消息的核心逻辑,包括获取元数据、计算分区、将消息添加到缓冲区、处理异常和记录错误等。同时,它也支持拦截器机制,允许开发人员自定义消息的处理行为。
kafka要保证消息的生产和消费过程的可靠性,需要从Broker服务端,生产者客户端,消费者客户端三管其下,只有这三个方面都保证可靠性,才能实现消息不重复,不丢失。
本节站在生产者客户端来谈谈如何保证消息的可靠性,kafka提供了一些生产者配置参数来保证:
#新版本中
acks=all
#在一些比较旧的apache kafka老版本中,参数名称如下
request.required.acks=all
ack参数决定了生产者发送完消息后,如何消息进行确认的机制:
由于kafka生产者只和Leader分区副本进行通信,Follower副本负责从Leader分区进行副本的同步。所以,如果由于某些原因导致当前主题的分区副本进行Leader重新选举,如果选举完成后,前任Leader宕机,导致消息没有被复制到现任Leader那里,就会导致数据丢失。
retries=Integer.MAX_VALUE
retry.backoff.ms=100
delivery.timeout.ms=120000
注意:
RetriableException
会进行重试,也就是重新发送消息。retries
配置了允许重试的最大次数;retry.backoff.ms
配置了2次重试之间的时间间隔,单位ms毫秒;delivery.timeout.ms
配置了消息完成发送的超时时间,超过这个时间将不再重试,retries
参数失效。注意:
注意:
retries
上限或delivery.timeout.ms
上限之后,消息发送重试了多次,仍然没有发送成功。对于这种情况,我们还是要区别对待 即:实际上消息数据已经在服务端写入成功,但是生产者没有接收到服务端的ack响应。
EOS(exactly once semantics,精确一次处理语义)
,通过实现消息数据的幂等性和事务处理,来实现消息数据被精确的发送一次。本节我们讨论的是kafka生产端如何确保消息发送有序性呢? 有几种常见的手段呢?
结合上图,可知:
因此,要实现消息的有序性,有以下几个思路:
kafka生产者提供了消息发送的重试机制,也就是说消息发送失败后,kafka生产者会重新发送消息,那么就会出现下面这种情况:
这就会导致了发往kafka分区的数据出现了顺序上的错乱,如果要避免这个问题,我们需要设置一下生产端相关参数,如下所示:
max.in.flight.requests.per.connection=1
这个参数的作用是:对于一个kafka客户端请求连接(可以认为是一个生产者),一旦出现1个批次的消息发送失败,在该批次的数据重试(重新发送)成功之前,下一个批次的消息数据发送处于阻塞状态。上一个批次不成功,下一个批次就永远发不出去。
注意:
需要注意的是,max.in.flight.requests.per.connection只适用于异步发送模式(即使用send方法而不是sendSync方法)。在同步发送模式下,由于每个请求都会阻塞,所以不存在未确认的请求问题。
总的来说,max.in.flight.requests.per.connection是一个重要的Kafka生产者配置参数,可以帮助优化生产者的性能和吞吐量,但需要根据实际情况进行合理的调整。
public interface ProducerInterceptor<K, V> extends Configurable {
/**
* 该方法封装于KafkaProducer.send()方法中,运行在用户主线程
* Producer确保在消息序列化前调用该方法,可以对消息进行任意操作,但慎重修改消息的topic、key和partition,会影响分区以及日志压缩
*/
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
/**
* 该方法在消息发送结果应答或者发送失败时调用,并且通常都是在callback()触发之前执行,运行在IO线程中
。实现该方法的代码逻辑尽量简单,否则影响消息发送效率
*/
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
/**
* 生产者的producer.close触发
*/
public void close();
}
public class RequestStatCalInterceptor implements ProducerInterceptor<String,String> {
private static final String MSG_PREFIX="dhy:";
private final AtomicInteger successCnt = new AtomicInteger(0);
private final AtomicInteger errorCnt = new AtomicInteger(0);
@Override
public ProducerRecord<String,String> onSend(ProducerRecord<String,String> msg) {
return new ProducerRecord<>(msg.topic(),msg.partition(),msg.timestamp(),msg.key(),MSG_PREFIX+msg.value());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (metadata == null) {
errorCnt.getAndIncrement();
}else {
successCnt.getAndIncrement();
}
}
@Override
public void close() {
double successRate = (double) successCnt.get() / (successCnt.get() + errorCnt.get());
System.out.println("消息发送成功率:" + successRate*100 +"%");
}
@Override
public void configure(Map<String, ?> configs) {
}
}
注意:
//拦截器的配置
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList("interceptor.RequestStatCalInterceptor"));
也可以单独指定配置一个拦截器:
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, RequestStatCalInterceptor.class.getName());
kafka客户端生产者序列化接口如下,如果我们需要实现自定义数据格式的序列化,需要定义一个类实现该接口。
什么是序列化和反序列化:
/**
* 将对象转成二进制数组的接口序列化实现类
*/
public interface Serializer<T> extends Closeable {
/**
* 参数configs会传入生产者配置参数,
* 序列化器实现类可以根据生产者参数配置影响序列化逻辑
* isKey布尔型,表示当前序列化的对象是不是消息的key,如果不是key就是value
*/
default void configure(Map<String, ?> configs, boolean isKey) {
// intentionally left blank
}
/**
* 重要方法将对象data转换为二进制数组
*/
byte[] serialize(String topic, T data);
default byte[] serialize(String topic, Headers headers, T data) {
return serialize(topic, data);
}
/**
* 关闭序列化器
* 此方法的实现必须是幂等的,因为可能被调多次
*/
@Override
default void close() {
// intentionally left blank
}
}
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.7</version>
</dependency>
public class Peo {
private String name;
private Integer age;
...
}
public class JacksonSerializer implements Serializer<Peo> {
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public byte[] serialize(String topic, Peo data) {
byte[] result=null;
try {
result=objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return result;
}
}
注意: ObjectMapper是线程安全的,可以在多个线程之间共享和重用。它的线程安全性主要来自于以下两个方面:
注意:
//序列化器的配置
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
@Test
void testJacksonSerializer() throws ExecutionException, InterruptedException {
Peo peo = new Peo();
peo.setName("dhy");
peo.setAge(18);
KafkaProducer<String, Peo> producer = new KafkaProducer<>(props);
RecordMetadata metadata = producer.send(new ProducerRecord<>(TEST_TOPIC, peo)).get();
System.out.println("消息偏移量为:"+metadata.offset());
}
KafkaProducer的默认分区策略为:
/**
* 分区器接口
*/
public interface Partitioner extends Configurable, Closeable {
/**
* 根据消息record信息对其进行重新分区
*
* @param topic 主题名称
* @param key 用于分区的key对象
* @param keyBytes 用于分区的key的二进制数组
* @param value 生产者消息对象
* @param valueBytes 生产者消息对象的二进制数组
* @param cluster 当前kafka集群的metadata信息
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
/**
* 当分区器执行完成时被调用
*/
public void close();
default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
}
/**
* 通过对消息value进行hash,然后取余于分区数计算出消息要被路由到的分区
*/
public class ValuePartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return partition(valueBytes, cluster.partitionsForTopic(topic).size());
}
private int partition(byte[] valueBytes, int numPartitions) {
return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, ValuePartitioner.class.getName());
在生产者消息可靠性的重试机制一节,我们讲过kafka生产者发送数据失败后的重试机制,同时也介绍过一种可能产生的异常情况:
通常情况下我们不能接受这种情况的发生,我们期望的效果是exactly once(一批数据发送成功一次,并且只成功一次)。在0.11.0.0版本之前这是做不到的,在0.11.0.0版本之后kafka引入了幂等和事务机制,从而可以支持exactly once语义。
概念介绍:
问题: Kafka是如何做到发送重复消息(重试),仍然可以保证幂等性的呢?
注意;
kafka幂等性解决的是同一个消息被发送多次,发送至同一个分区。那么如果多个不同的消息发送至不同的分区,我们该如何保证多条消息要么都发送成功(都写入kafka broker数据日志),要么就都不写入kafka数据日志?
这就需要依赖kafka事务来实现:
KafkaProducer提供了5个与事务相关的方法,详细如下:
//初始化事务
void initTransactions();
//开启事务
void beginTransaction() throws ProducerFencedException;
//为消费者提供在事务内的位移提交的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId)throws ProducerFencedException;
//提交事务
void commitTransaction() throws ProducerFencedException;
//中止事务,类比事务的回滚
void abortTransaction() throws ProducerFencedException;
在kafka消费客户端有一个参数isolation.level,这个参数的默认值为“read_uncommitted”,意思是说消费端应用可以看到(消费到)未提交的事务,当然对于已提交的事务也是可见的。
这个参数还可以设置为“read_committed”,表示消费端应用不可以看到尚未提交的事务内的消息。
举个例子:
/**
* 生产者使用demo
*/
public class KafkaProducerTest {
private static final String TEST_TOPIC = "test1";
private Properties props;
@BeforeEach
public void prepareTest() {
props = new Properties();
//kafka broker列表
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
//可靠性确认应答参数
props.put(ProducerConfig.ACKS_CONFIG, "1");
//发送失败,重新尝试的次数
props.put(ProducerConfig.RETRIES_CONFIG, "3");
//生产者数据key序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//序列化器的配置
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//生产者端开启幂等
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, Boolean.TRUE);
//生产者端开启事务
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transaction");
}
@Test
void testTransaction() throws ExecutionException, InterruptedException {
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
//0.初始化事务管理器
producer.initTransactions();
//1.开启事务
producer.beginTransaction();
try {
//2.发送消息
producer.send(new ProducerRecord<>(TEST_TOPIC, Integer.toString(1), "test1"));
producer.send(new ProducerRecord<>(TEST_TOPIC, Integer.toString(2), "test2"));
producer.send(new ProducerRecord<>(TEST_TOPIC, Integer.toString(3), "test3"));
//3.提交事务
producer.commitTransaction();
} catch (ProducerFencedException e) {
e.printStackTrace();
//4.1 事务回滚
producer.abortTransaction();
} catch (KafkaException e) {
e.printStackTrace();
//4.2 事务回滚
producer.abortTransaction();
} finally {
producer.close();
}
}
}
session.timout.ms=10000
heartbeat.interval.ms=2000
max.poll.interval.ms=配置该值大于消费者批处理消息最长耗时(默认5分钟)
max.poll.records=500(默认值是500)
session.timout.ms
设置的时间没有接收到消费者的心跳,就认为该消费者挂掉了。所以这个值可以相对大一些,比如10s。heartbeat.interval.ms
是消费者向kafka服务端发送心跳的时间间隔,这个值越小频率就越高,发生心跳失联误判的概率就越低。max.poll.interval.ms
时间后仍然不执行下一次数据拉取poll(因为数据处理超时),kafka服务端就认为这个消费者挂掉了。所以为了避免rebalance,我们应该让单批次(拉取一个批次默认是500条)数据处理的时长小于max.poll.interval.ms
配置值。max.poll.interval.ms
不变的情况下,减少max.poll.records
配置值。一个批次拉取的数据越少,进行数据处理的时间就越短,从而避免因为超时导致的rebalance问题。public class ConsumerBalance {
private static KafkaConsumer<String, String> consumer;
/**
* 存储一个主题多个分区的当前消费偏移量
*/
private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
/**
* 初始化消费者
*/
static {
Properties configs = initConfig();
consumer = new KafkaConsumer<>(configs);
//主题订阅
consumer.subscribe(Collections.singletonList("test1"), new RebalanceListener(consumer));
}
/**
* 初始化配置
*/
private static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "dhy-group");
props.put("enable.auto.commit", true);
props.put("auto.commit.interval.ms", 1000);
props.put("session.timeout.ms", 30000);
props.put("max.poll.records", 1000);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
return props;
}
public static void main(String[] args) {
while (true) {
// 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
records.forEach((ConsumerRecord<String, String> record) -> {
System.out.println("topic:" + record.topic()
+ ",partition:" + record.partition()
+ ",offset:" + record.offset()
+ ",key:" + record.key()
+ ",value" + record.value());
// 每次消费记录消费偏移量,用于一旦发生rebalance时提交
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no matadata"));
});
consumer.commitAsync();
}
}
static class RebalanceListener implements ConsumerRebalanceListener {
KafkaConsumer<String, String> consumer;
public RebalanceListener(KafkaConsumer<String,String> consumer) {
this.consumer = consumer;
}
/**
* 在rebalance发生之前和消费者停止读取消息之后被调用
*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(currentOffsets);
}
/**
* 在rebalance完成之后(重新分配了消费者对应的分区),消费者开始读取消息之前被调用。
*/
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumer.commitSync(currentOffsets);
}
}
}
错误原因:
不推荐使用原因:
class ConsumerGroupThreadPoolTest {
@Test
void test(){
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
executorService.execute(new MyConsumer());
}
}
}
class MyConsumer implements Runnable {
private static final String TEST_TOPIC = "test1";
private final KafkaConsumer<String, String> consumer;
public MyConsumer() {
Properties props = new Properties();
//kafka集群信息
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//消费者组名称
props.put(ConsumerConfig.GROUP_ID_CONFIG, "dhy_group");
//key的反序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//value的反序列化器
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//初始化消费者
consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
consumeTemplate(MyConsumer::printRecord, null);
}
/**
* recordConsumer针对单条数据进行处理,此方法中应该做好异常处理,避免外围的while循环因为异常中断。
*/
public void consumeTemplate(Consumer<ConsumerRecord<String, String>> recordConsumer, Consumer<KafkaConsumer<String, String>> afterCurrentBatchHandle) {
consumer.subscribe(Collections.singletonList(TEST_TOPIC));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, String> record : records) {
recordConsumer.accept(record);
}
if (afterCurrentBatchHandle != null) {
afterCurrentBatchHandle.accept(consumer);
}
}
} finally {
consumer.close();
}
}
private static void printRecord(ConsumerRecord<String, String> record) {
System.out.println("topic:" + record.topic()
+ ",partition:" + record.partition()
+ ",offset:" + record.offset()
+ ",key:" + record.key()
+ ",value" + record.value());
record.headers().forEach(System.out::println);
}
}
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
/**
* 发生的时机:在返回给客户端之前,也就是poll() 方法返回之前
* 这个方法允许你修改records(记录集合),然后信息的记录集合被返回
* 没有返回记录条数上的限制,你可以在这里可以可以过滤或者是生成新的记录
*/
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
//当offset 被提交之后调用
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
//当拦截器关闭的时候被调用
public void close();
}
package interceptor;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
public class LatencyCalConsumerInterceptor implements ConsumerInterceptor<String, String> {
/**
* 数据处理总耗时
*/
private static final AtomicLong totalLatency = new AtomicLong();
/**
* 消息的总数量
*/
private static final AtomicLong msgCount = new AtomicLong();
/**
* 在消费者进行数据处理之前被调用
*/
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long lantency = 0L;
//累加每条消息处理耗时
for (ConsumerRecord<String, String> msg : records) {
lantency += (System.currentTimeMillis() - msg.timestamp());
}
//获取当前消息发送处理的总耗时
long totalLatencyLong = totalLatency.addAndGet(lantency);
//获取消息总数
long msgCountLong = msgCount.incrementAndGet();
System.out.println("该批次消息发出到消费处理的平均延时:" + (totalLatencyLong / msgCountLong));
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, LatencyCalConsumerInterceptor.class.getName());
消费者反序列化接口:
public interface Deserializer<T> extends Closeable {
/**
* 参数configs会传入消费者配置参数,
* 反序列化器实现类可以根据消费者参数配置影响序列化逻辑
* isKey布尔型,表示当前反序列化的对象是不是消息的key,如果不是key就是value
*/
default void configure(Map<String, ?> configs, boolean isKey) {
}
//核心反序列化函数,将二进制数组转成T类对象
T deserialize(String topic, byte[] var2);
default T deserialize(String topic, Headers headers, byte[] data) {
return this.deserialize(topic, data);
}
default void close() {
}
}
/**
* 反序列化器
*/
public class JacksonDeserializer implements Deserializer<Peo> {
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public Peo deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(data,Peo.class);
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
//value的反序列化器
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName());
@Test
void testDeserializer(){
//1.创建消费者
KafkaConsumer<String, Peo> consumer = new KafkaConsumer<>(props);
//2.订阅Topic
consumer.subscribe(Collections.singletonList(TEST_TOPIC));
try {
while (true) {
//循环拉取数据,
//Duration超时时间,如果有数据可消费,立即返回数据
// 如果没有数据可消费,超过Duration超时时间也会返回,但是返回结果数据量为0
ConsumerRecords<String, Peo> records = consumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, Peo> record : records) {
System.out.println(record.value());
}
}
} finally {
//退出应用程序前使用close方法关闭消费者,
// 网络连接和socket也会随之关闭,并立即触发一次再均衡(再均衡概念后续章节介绍)
consumer.close();
}
}
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
spring:
kafka:
bootstrap-servers: localhost:9092
producer: # 生产者
retries: 3 #发送失败重试次数
acks: all #所有分区副本确认后,才算消息发送成功
# 指定消息key和消息体的序列化编码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer: #消费者
# 指定消息key和消息体的反序列化解码方式,与生产者序列化方式一一对应
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# 该参数作用见下文注释
properties:
spring:
json:
trusted:
packages: '*'
注意:
spring.kafka.consumer.properties.spring.json.trusted.packages
是一个Kafka 消费者属性,用于指定 Spring Kafka 应该信任哪些 Java 包来反序列化 JSON 消息。
java.lang
包下的类。如果你的 JSON 消息包含其他类型的对象,例如自定义的 POJO 类,那么 Spring Kafka 将会拒绝反序列化这些消息。
spring.kafka.consumer.properties.spring.json.trusted.packages
属性来指定
Spring Kafka 应该信任哪些 Java 包。
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example.myapp.pojo
com.example.myapp.pojo
包下的类。logging:
level:
org:
springframework:
kafka: ERROR # spring-kafka
apache:
kafka: ERROR # kafka
生产者环境搭建:
@Data
public class User {
private String name;
private Integer age;
}
@SpringBootTest(classes = KafkaSpringBootDemo.class)
class SpringKafkaTest {
@Resource
KafkaTemplate<String, User> kafkaTemplate;
@Test
void testProducer() {
User user = new User();
user.setAge(21);
user.setName("大忽悠");
kafkaTemplate.send(TEST_KAFKA_TOPIC, user);
//阻塞等待观察结果
System.in.read();
}
}
注意:
<String,User>
表示发送的数据消息的key的数据类型是String,数据体value的数据类型是User。value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
,所以User对象会被序列化为JSON对象之后发往kafka服务端。消费者环境搭建:
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = TEST_KAFKA_TOPIC , groupId = TEST_CONSUMER_GROUP)
public void dealUser(User user) {
log.info("kafka consumer msg: {}",user);
}
}
注意:
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
将生产者发送的User对象反序列化。@KafkaListener(concurrency=n)
。(用法:消费者线程数=主题分区数)运行生产者测试用例,查看输出:
如果测试过程中出现下面这个错误:
Caused by: java.lang.IllegalArgumentException: The class 'springboot.pojo.User' is not in the trusted packages
KafkaTemplate的send方法所支持参数列表如下:
send方法默认为异步,即发送之后就不再等待服务端对该消息的确认,如果出现异常生产者客户端不会有任何的感知。
为了能够使生产者能够感知到消息是否真的发送成功了,有两种方式:
添加回调函数写法如下:
@Test
void testAsyncWithCallBack() throws IOException {
User user = new User();
user.setAge(21);
user.setName("大忽悠");
kafkaTemplate.send(TEST_KAFKA_TOPIC, user).addCallback(new ListenableFutureCallback<SendResult<String, User>>() {
@Override
public void onFailure(Throwable ex) {
log.error("msg send err: ",ex);
}
@Override
public void onSuccess(SendResult<String, User> result) {
// 消息发送到的topic
String topic = result.getRecordMetadata().topic();
// 消息发送到的分区
int partition = result.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = result.getRecordMetadata().offset();
log.info("msg send success,topic: {},partition: {},offset: {}",topic,partition,offset);
}
});
System.in.read();
}
默认情况下send()方法就是异步调用的方法,如果想实现同步阻塞的方法,需要在send方法的基础上调用get()方法。
get()无参方法有一个重载方法get(long timeout, TimeUnit unit),当超过一定的时长服务端仍无消息写入成功确认,则抛出TimeoutException异常。
@Test
void testSync() throws IOException {
User user = new User();
user.setAge(21);
user.setName("大忽悠");
try {
SendResult<String, User> result = kafkaTemplate.send(TEST_KAFKA_TOPIC, user).get();
// 消息发送到的topic
String topic = result.getRecordMetadata().topic();
// 消息发送到的分区
int partition = result.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = result.getRecordMetadata().offset();
log.info("msg send success,topic: {},partition: {},offset: {}",topic,partition,offset);
} catch (InterruptedException | ExecutionException e) {
log.error("send msg sync occurs err: ",e);
}
System.in.read();
}
注意拦截器和分区器在Spring看来属于不常用的配置属性,对于不常用的原生配置属性,spring全都放在properties下面进行配置。也就是说原生API中,通过Properties传递给生产者的属性,在这里全部都支持。
spring:
kafka:
producer:
properties:
interceptor.classes: springboot.producer.interceptor.RequestStatCalInterceptor
partitioner.class: springboot.producer.partitioner.ValuePartitioner
注意:
幂等性的设置仍然很简单,只需要将生产者客户端参数enable.idempotence设置为true即可。
spring:
kafka:
producer:
properties:
enable.idempotence: true
kakfa的事务处理和spring结合后,有两种使用方式,分别为手动挡(模板方法)和自动挡(注解),这里以订单支付场景为例:
介绍完kafak事务的应用场景后,下面来演示一下事务的手动挡使用方式:
@Test
void testTransaction() {
User user = new User();
user.setAge(21);
user.setName("大忽悠");
//调用事务模板方法
kafkaTemplate.executeInTransaction(operations -> {
operations.send(TEST_KAFKA_TOPIC, user);
//业务处理发生异常,事务回滚
throw new RuntimeException("fail");
});
}
自动挡的方式就是使用@Transactional注解,同时需要针对kafka做额外的配置管理,但是不推荐使用这种方式,因为容易与数据库事务混淆。
使用@KafkaListener注解标注某个消费者,该注解中有若干属性,作用分别为:
public @interface KafkaListener {
/**
* 消费者的id,如果没有配置或默认生成一个。如果配置了会覆盖groupId,笔者的经验这个配置不需要配
*/
String id() default "";
/**
* 配置一个bean,类型为:org.springframework.kafka.config.KafkaListenerContainerFactory
*/
String containerFactory() default "";
/**
* 三选一:该消费者组监听的Topic名称
*/
String[] topics() default {};
/**
* 三选一:通过为消费者组指定表达式匹配监听多个Topic(笔者从来没用过,也不建议使用)
*/
String topicPattern() default "";
/**
* 三选一:消费组指定监听Topic的若干分区。
*/
TopicPartition[] topicPartitions() default {};
/**
* 没用过,不知道作用
*/
String containerGroup() default "";
/**
* Listener的异常处理器,后续会介绍
* @since 1.3
*/
String errorHandler() default "";
/**
* 消费者组的分组id
* @since 1.3
*/
String groupId() default "";
/**
* 设否设置id属性为消费组组id
* @since 1.3
*/
boolean idIsGroup() default true;
/**
* 消费者组所在客户端的客户端id的前缀,用于kafka客户端分类
* @since 2.1.1
*/
String clientIdPrefix() default "";
/**
* 用于SpEL表达式,获取当前Listener的配置信息
* 如获取监听Topic列表的SpEL表达式为 : "#{__listener.topicList}"
* @return the pseudo bean name.
* @since 2.1.2
*/
String beanRef() default "__listener";
/**
* 当前消费者组启动多少了消费者线程,并行执行消费动作
* @since 2.2
*/
String concurrency() default "";
/**
* 是否自动启动,true or false
* @since 2.2
*/
String autoStartup() default "";
/**
* Kafka consumer 属性配置,支持所有的apache kafka 消费者属性配置
* 但不包括group.id 和 client.id 配置属性
* @since 2.2.4
*/
String[] properties() default {};
/**
* 笔者从来没用过,自己理解下面的这段英文吧
* When false and the return type is an {@link Iterable} return the result as the
* value of a single reply record instead of individual records for each element.
* Default true. Ignored if the reply is of type {@code Iterable<Message<?>>}.
* @return false to create a single reply record.
* @since 2.3.5
*/
boolean splitIterables() default true;
/**
* 笔者从来没用过,自己理解下面的这段英文吧
* Set the bean name of a
* {@link org.springframework.messaging.converter.SmartMessageConverter} (such as the
* {@link org.springframework.messaging.converter.CompositeMessageConverter}) to use
* in conjunction with the
* {@link org.springframework.messaging.MessageHeaders#CONTENT_TYPE} header to perform
* the conversion to the required type. If a SpEL expression is provided
* ({@code #{...}}), the expression can either evaluate to a
* {@link org.springframework.messaging.converter.SmartMessageConverter} instance or a
* bean name.
* @return the bean name.
* @since 2.7.1
*/
String contentTypeConverter() default "";
}
把消费者监听的主题,消费者组名称,消费者组中消费者数量等常用信息做成自定义配置(而不是在代码中写死),如下所示:
dhyconsumer:
topic: topic1,topic2
group-id: dhy-group
concurrency: 5
注解属性支持使用SPEL表达式,所以我们可以读取配置作为属性值:
@KafkaListener(topics = "#{'${dhyconsumer.topic}'.split(',')}",
groupId = "${dhyconsumer.group-id}",
concurrency="${dhyconsumer.concurrency}")
public void readMsg(ConsumerRecord consumerRecord) {
//监听到数据之后,进行处理操作
}
在某些特殊场景下,希望消费Topic主题中的某几个分区(不是全部分区消费)。或者是针对某个分区从指定的偏移量开始消费。
@KafkaListener(topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = {"0","4"},partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "300"))
})
public void readMsg(ConsumerRecord<?, ?> record) {
}
上面例子中消费者监听topic1的0,1分区(可能包含不只2个分区);监听topic2的第0和4分区 ,并且第0分区从offset为300的开始消费;
@Configuration
public class KafkaInitialConfiguration {
/**
* 监听器工厂
*/
@Autowired
private ConsumerFactory<String,String> consumerFactory;
/**
* @return 配置一个消息过滤策略
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> myFilterContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 被过滤的消息将被丢弃
factory.setAckDiscarded(true);
//设置消息过滤策略
factory.setRecordFilterStrategy(new RecordFilterStrategy() {
@Override
public boolean filter(ConsumerRecord consumerRecord) {
//这里做逻辑判断
//返回true的消息将会被丢弃
return true;
}
});
return factory;
}
}
使用方法,myFilterContainerFactory是上文中bean方法的名称:
@KafkaListener(containerFactory ="myFilterContainerFactory")
注意:
注意:
注意:
除了上面提到的一些配置属性,实际上apache kafka consumer支持的原生配置属性,要比Spring 提供的配置属性多得多。所有的apache kafka原生配置属性都可以通过properties配置来传递:
@KafkaListener(properties = {"enable.auto.commit:false","max.poll.interval.ms:6000" })
我们可以通过注解方式获取消息头:
@KafkaListener(topics = "topic1")
public void readMsg(@Payload String data,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
}
Spring-Kafka只需要通过一个@SendTo注解即可以实现消息的转发,被注解方法的return值即转发的消息内容:
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"topic2"})
@SendTo("topic1")
public String listen1(String data) {
System.out.println("业务A收到消息:" + data);
return data + "(已处理)";
}
@KafkaListener(topics = {"topic1"})
public void listen2(String data) {
System.out.println("业务B收到消息:" + data);
}
}
Spring Kafka监听器模式(spring.kafka.listener.type配置属性)有两种:
@KafkaListener(topics = TEST_KAFKA_TOPIC , groupId = TEST_CONSUMER_GROUP)
public void dealUser(User user) {
log.info("kafka consumer msg: {}",user);
}
# 开启自动提交消费offset(这个配置实际上专指按照周期自动提交)
spring.kafka.consumer.enable-auto-commit: true
# 进行自动提交操作的时间间隔
spring.kafka.consumer.auto-commit-interval: 10s
# 禁用按周期自动提交消费者offset
spring.kafka.consumer.enable-auto-commit: false
# offset提交模式为record
spring.kafka.listener.ack-mode: record
注意: ack-mode一共有下面的几种配置模式
ack-mode模式 | 说明 |
---|---|
RECORD | 每条记录提交一次偏移量 |
BATCH(默认) | 每一次poll()下来的一个批次的数据处理完成之后提交偏移量 |
TIME | 一批poll()下来的数据,处理时间超过spring.kafka.listener.ack-time就提交一次偏移量 |
COUNT | 一批poll()下来的数据大于等于spring.kafka.listener.ack-count设置时就提交一次偏移量 |
COUNT_TIME | 超时或超数量 TIME或COUNT ,有一个条件满足时提交偏移量 |
MANUAL手动提交 | 手动调用Acknowledgment.acknowledge()进行消费offset提交,但是在一个批次数据处理完成之后才能提交。 |
MANUAL_IMMEDIATE | 手动调用Acknowledgment.acknowledge()后立即提交偏移量,即使可能此时只处理了一个批次数据中的一条或几条数据,也要提交偏移量。 |
# 禁用自动提交消费offset
spring.kafka.consumer.enable-auto-commit: false
# offset提交模式为manual_immediate
spring.kafka.listener.ack-mode: manual_immediate 或 manual
@KafkaListener(topics = TEST_KAFKA_TOPIC, groupId = TEST_CONSUMER_GROUP)
public void dealUser(User user, Acknowledgment ack) {
log.info("kafka consumer msg: {}", user);
ack.acknowledge();
}
注意:
ack-mode=manual
,我们看到dealUser
方法是一条数据一条数据的接收参数,此时处理完成一条数据,然后调用ack.acknowledge()
;不意味着消费者偏移量会立即提交,而是“我要提交”,真正的提交时机是一个批次的消息都处理完成之后才会提交。ack-mode=manual_immediate
, 就意味着处理一条消息,立即提交一次消费者偏移量。监听器函数参数是List集合类型,需要设置spring.kafka.listener.type: batch
,不是默认的:
@KafkaListener(topics = TEST_KAFKA_TOPIC, groupId = TEST_CONSUMER_GROUP)
public void dealUser(List<User> user) {
}
注意:
# listener类型为批量batch类型(默认为single单条消费模式)
spring.kafka.listener.type: batch
# offset提交模式为batch(不可使用record - 启动报错)
spring.kafka.listener.ack-mode: batch
# 禁用自动按周期提交消费者offset
spring.kafka.consumer.enable-auto-commit: false
注意:
# listener类型为批量batch类型(默认为single单条消费模式)
spring.kafka.listener.type: batch
# offset提交模式为manual(不可使用record - 启动报错)
spring.kafka.listener.ack-mode: manual_immediate 或 manual
# 禁用自动按周期提交消费者offset
spring.kafka.consumer.enable-auto-commit: false
@KafkaListener(topics = TEST_KAFKA_TOPIC, groupId = TEST_CONSUMER_GROUP)
public void dealUser(List<User> user,Acknowledgment ack) {
user.forEach(System.out::println);
ack.acknowledge();
}
毒丸消息(应用场景之一)
spring:
kafka:
consumer:
auto-offset-reset: earliest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.json.trusted.packages: '*'
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
将Consumer的key-deserializer 和 value-deserializer
都配置为 org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
并委任具体的Key和Value反序列化器:
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
注意:
在Key或Value反序列化失败时,先有delegate代理配置的反序列化器进行反序列化。
如果反序列化失败,ErrorHandlingDeserializer 可以确保毒丸(Poison Pill)消息被处理掉并记录日志,Consumer offeset可以向前移动,使得Consumer可以继续处理后续的消息。
ErrorHandlingDeserializer的反序列化源码如下:
@Override
public T deserialize(String topic, byte[] data) {
try {
return this.delegate.deserialize(topic, data);
}
catch (Exception e) {
return recoverFromSupplier(topic, null, data, e);
}
}
private T recoverFromSupplier(String topic, Headers headers, byte[] data, Exception exception) {
//如果我们指定了反序列失败的处理函数,这里会回调,否则返回null
if (this.failedDeserializationFunction != null) {
FailedDeserializationInfo failedDeserializationInfo =
new FailedDeserializationInfo(topic, headers, data, this.isForKey, exception);
return this.failedDeserializationFunction.apply(failedDeserializationInfo);
}
else {
return null;
}
}
/**
* Supplier for a T when deserialization fails.
*/
public static final String KEY_FUNCTION = "spring.deserializer.key.function";
/**
* Supplier for a T when deserialization fails.
*/
public static final String VALUE_FUNCTION = "spring.deserializer.value.function";
除了再反序列化过程中出现异常,还有可能我们的消费者程序处理数据过程中出现异常,同样有全局的异常处理机制可以使用。实现KafkaListenerErrorHandler接口对监听器出现的异常进行处理。
@Component
public class MyErrorHandler implements KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
return null;
}
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
return null;
}
}
配置使用方法如下
@KafkaListener(errorHandler="myErrorHandler")
public void userdeal(@Payload ConsumerRecord consumerRecord) {
//所有的异常全部对外抛出,不要处理,由myErrorHandler统一处理
}
ObjectMapper默认将日期类型序列化为Long类型的时间戳,而Spring中注入的ObjectMapper进行了配置修改,默认将日期类型序列化为字符串。
注意:
@SpringBootTest(classes = KafkaSpringBootDemo.class)
class JsonSerializerTest {
@Resource
private ObjectMapper objectMapper;
private ObjectMapper objectMapperNew=new ObjectMapper();
@Test
void dateSerializerTest() throws JsonProcessingException {
System.out.println("spring注入的ObjectMapper序列化结果: "+objectMapper.writeValueAsString(new Date()));
System.out.println("手动new的ObjectMapper序列化结果: "+objectMapperNew.writeValueAsString(new Date()));
}
}
在反序列化的时候,这个long类型数字将无法被自动识别为Date数据类型,而是被识别为Long类型,从而导致反序列化失败,而Spring kafka默认使用的日期序列化ObjectMapper ,也是手动new出来的。所以会将Date类型序列化为Long类型的时间戳。如果我们不希望出现这样的问题,可以进行如下的定义:
@Configuration
public class ConsumerKafkaConfig {
@Resource
private ObjectMapper objectMapper;
//反序列化器
@Bean
public DefaultKafkaConsumerFactory<?, ?> cf(KafkaProperties properties) {
Map<String, Object> props = properties.buildConsumerProperties();
return new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(), //指定key的反序列化方式是String
new JsonDeserializer<>(objectMapper)); //指定value的反序列化方式是JSON
}
//序列化器
@Bean
public DefaultKafkaProducerFactory<?, ?> pf(KafkaProperties properties) {
Map<String, Object> props = properties.buildProducerProperties();
return new DefaultKafkaProducerFactory<>(props,
new StringSerializer(), //指定key的序列化方式是String
new JsonSerializer<>(objectMapper)); //指定value的序列化方式是JSON
}
}
这样配置之后就不要在application.yml配置文件中写下面的参数了,配置也了也不会生效:
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
我们还可以使用ObjectMapper中的configure()方法来修改其配置,以便将日期类型序列化为字符串。具体代码如下:
@Test
void configureTest() throws JsonProcessingException {
objectMapperNew.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
objectMapperNew.setDateFormat(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
System.out.println("手动new的ObjectMapper序列化结果: "+objectMapperNew.writeValueAsString(new Date()));
}
这将禁用日期序列化为时间戳,并将日期格式设置为ISO 8601格式的字符串。您可以根据需要更改日期格式的格式字符串。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有