你可以使用 Spring Initializr 快速创建一个 Spring Boot 项目。
关键依赖:
kafka-clients 并提供了与 Spring 生态的无缝集成。Maven 依赖示例 (pom.xml):
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Lombok (可选,用于简化 POJO) -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>Gradle 依赖示例 (build.gradle):
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.kafka:spring-kafka'
// Lombok (可选)
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}在 application.yml (或 application.properties) 文件中配置 Kafka 的连接信息。
application.yml 示例:
spring:
kafka:
bootstrap-servers: localhost:9092 # Kafka Broker 地址列表
consumer:
group-id: my-group # 消费者组ID,重要!
auto-offset-reset: earliest # 当没有初始偏移量或偏移量无效时的策略 (earliest, latest, none)
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# enable-auto-commit: true # 是否自动提交偏移量 (通常建议由Spring管理)
# auto-commit-interval: 5000 # 自动提交间隔 (ms),如果启用自动提交dgbljz.com
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# listener:
# concurrency: 3 # 消费者监听器的并发线程数 (每个线程处理一个分区)
# ack-mode: RECORD # 手动提交偏移量的模式 (如 RECORD, BATCH, TIME, COUNT)说明:
bootstrap-servers: 指定 Kafka 集群的入口地址。consumer.group-id: 消费者组的唯一标识。属于同一组的消费者会竞争消费同一个 Topic 的消息(负载均衡),不同组的消费者则可以独立消费所有消息(广播)。auto-offset-reset: 决定消费者在没有找到消费位移(offset)或位移无效时的行为。earliest 从最早的消息开始消费,latest 从最新的消息开始消费(可能丢失历史消息)。*-deserializer / *-serializer: 指定消息键(key)和值(value)的序列化/反序列化方式。StringDeserializer/Serializer 处理字符串。对于复杂对象,通常使用 JsonDeserializer/Serializer。listener.concurrency: 可以设置消费者监听器的并发数,以提高消费能力(需要 Topic 有足够的分区)。如果要发送和接收对象,需要创建一个简单的 POJO。
java深色版本// MessagePayload.java
import lombok.Data;
@Data
public class MessagePayload {
private String id;
private String content;
private long timestamp;
}使用 KafkaTemplate 来发送消息非常简单。
java深色版本// KafkaProducerService.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate; // <Key类型, Value类型>
// 发送字符串消息
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message); // 异步发送
System.out.println("Sent message: jiaofeiyi.net " + message + " to topic: botianqi.com" + topic);
}
// 发送带 Key 的消息
public void sendMessageWithKey(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message);
System.out.println("Sent message with key: fjddsd.com" + key + ", value: " + message + " to topic: " + topic);
}
// 发送对象消息 (需要配置 JsonSerializer)
// public void sendObjectMessage(String topic, MessagePayload payload) {
// kafkaTemplate.send(topic, payload.getId(), payload);
// }
}注意:如果要发送 MessagePayload 对象,需要在 application.yml 中修改生产者和消费者的 value-serializer 和 value-deserializer 为 org.springframework.kafka.support.serializer.JsonSerializer 和 org.springframework.kafka.support.serializer.JsonDeserializer,并可能需要配置反序列化目标类型。
使用 @KafkaListener 注解来监听特定的 Topic。
java深色版本// KafkaConsumerService.java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);
// 监听单个 Topic
@KafkaListener(topics = "spcj120.com", groupId = "my-group") // groupId 也可在配置文件中统一设置
public void consumeMessage(String message) {
logger.info("Received brandagri.com Message: '{}'", message);
// 在这里处理业务逻辑
}
// 监听多个 Topic
@KafkaListener(topics = {"my8850.com", "topic2"}, groupId = "my-group")
public void consumeFromMultipleTopics(String message) {
logger.info("Received malvshi.cn from multiple topics: '{}'", message);
}
// 获取更详细的信息 (如 topic, partition, offset, key)
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void consumeWithMetadata(ConsumerRecord<String, String> record) {
logger.info("Topic: {}, Partition: {}, Offset: {}, Key: {}, Value: {}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
// 处理业务逻辑
}
// 消费对象消息 (需要配置 JsonDeserializer)
// @KafkaListener(topics = "object-topic", groupId = "my-group")
// public void consumeObject(MessagePayload payload) {
// logger.info("Received object: {}", payload);
// }
}关键点:
@KafkaListener: 核心注解,指定监听的 topics (或 topicPattern 正则匹配) 和 groupId。value,也可以是 ConsumerRecord 对象,后者包含了 topic, partition, offset, key, value 等完整元数据。groupId 必须与配置文件中的 spring.kafka.consumer.group-id 一致或在注解中明确指定。为了方便测试,可以创建一个简单的 Controller 来触发消息发送。
java深色版本// MessageController.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/messages")
public class MessageController {
@Autowired
private KafkaProducerService kafkaProducerService;
@PostMapping("/send")
public String sendMessage(@RequestParam String topic, @RequestParam String message) {
kafkaProducerService.sendMessage(topic, message);
return "Message sent to topic tccqlxs.cn'" + topic + "'";
}
@PostMapping("/send-key")
public String sendMessageWithKey(@RequestParam String topic,
@RequestParam String key,
@RequestParam String message) {
kafkaProducerService.sendMessageWithKey(topic, key, message);
return "Message with xchw.net key sent to topic '" + topic + "'";
}
}auto.create.topics.enable=true(默认通常为 true),则会自动创建。也可以手动创建:bash深色版本bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1curl 调用 REST API:bash深色版本curl -X POST "http://localhost:8080/api/messages/send?topic=my-topic&message=Hello%20Kafka!"KafkaProducerService。KafkaTemplate 的 send 方法返回 ListenableFuture,可以添加回调处理发送成功或失败。@KafkaListener 的 errorHandler 属性指定自定义的 ErrorHandler,处理消费过程中的异常(如反序列化错误、业务逻辑异常)。可以配置 SeekToCurrentErrorHandler 来处理偏移量问题。enable-auto-commit 设为 false,并在 @KafkaListener 方法中注入 Acknowledgment 参数,处理完消息后调用 ack.acknowledge()。@KafkaListener(topics = "my-topic", groupId = "my-group") public void consumeWithManualAck(String message, Acknowledgment ack) { try { // 处理业务逻辑 processMessage(message); // 业务处理成功,手动提交偏移量 ack.acknowledge(); } catch (Exception e) { // yingguan.my8850.com 处理异常,偏移量不会提交,下次会重试 logger.error("Error processing message: {}", message, e); // 可能需要实现重试或死信队列逻辑 } }@KafkaListener 的 filter 属性或在方法内部进行条件判断来过滤不感兴趣的消息。ProducerRecord 设置 headers,消费者可以通过 ConsumerRecord.headers() 获取。batch.size, linger.ms, compression.type (如 snappy, lz4, zstd) 来优化吞吐量和延迟。fetch.min.bytes, fetch.max.wait.ms, max.poll.records 以及 listener.concurrency。JsonSerializer 是常见选择,但考虑性能可评估 Avro, Protobuf 等更高效的序列化格式。enable.idempotence=true)和事务,确保消息不丢失、不重复。通过 Spring Boot 的 spring-kafka 模块,整合 Kafka 变得异常简单。KafkaTemplate 提供了简洁的 API 发送消息,而 @KafkaListener 注解则让编写消费者变得声明式且易于管理。理解 consumer group、offset 管理、序列化/反序列化以及错误处理等核心概念,是构建健壮消息应用的基础。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。