记录,避免重复造轮子。
@Service
@Slf4j
public class KafkaCommonProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 发送消息
* @param topic topic
* @param t 消息
* @param <T> 消息类型
* @return 发送结果
*/
public <T> ListenableFuture<SendResult<String, String>> send(String topic, T t) {
final String message = JSON.toJSONString(t);
return kafkaTemplate.send(topic, message);
}
/**
* 发送消息
* @param topic topic
* @param key key
* @param t 消息
* @param <T> 消息类型
* @return 发送结果
*/
public <T> ListenableFuture<SendResult<String, String>> send(String topic, String key, T t) {
final String message = JSON.toJSONString(t);
return kafkaTemplate.send(topic, key, message);
}
}
如果需要回调则可以
public void send(String warningMessage) {
log.info(">>>>> Kafka消息发送,topic: {}, Key: {}, message: {}", TOPIC_NAME, TOPIC_NAME, warningMessage);
ListenableFuture<SendResult<String, String>> future = kafkaCommonProducer.send(TOPIC_NAME, TOPIC_NAME, warningMessage);
future.addCallback(
success -> log.info(">>>>> Kafka消息发送成功,{}", success.toString()),
failure -> log.info(">>>>> Kafka消息发送失败,{}", failure.getMessage())
);
}
application.yml 配置如下:
spring:
application:
name: test-kafka-msg
kafka:
bootstrap-servers: localhost:20902
############如果采用SASL认证的话需要添加以下内容
producer:
properties:
sasl.mechanism: SCRAM-SHA-256
security.protocol: SASL_PLAINTEXT
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test";
Copyright: 采用 知识共享署名4.0 国际许可协议进行许可 Links: https://cloud.tencent.com/developer/article/2020541
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有