
Spring Boot是一个用于快速构建基于Spring框架的Java应用程序的框架。Kafka是一种分布式流处理平台,用于实时传输和处理大规模数据。通过Spring Boot与Kafka的连接,可以轻松地在Spring应用程序中使用Kafka进行数据流处理。
随着大数据和实时数据处理需求的不断增长,Kafka作为一种分布式流处理平台,越来越受到开发者的青睐。Spring Boot作为一款快速、简便的Java开发框架,能够帮助开发者快速搭建应用程序。将Spring Boot与Kafka连接,可以使开发者更加便捷地在Spring应用程序中使用Kafka进行数据流处理。
Spring Boot与Kafka的连接适用于多种应用场景,如实时数据流处理、日志收集、事件驱动型微服务等。以下是一些具体应用场景:
vi config/server.properties
主要修改地方:
修改运行地址地址,改为虚拟机ip
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
# host.name=192.168.217.142 192.168.217.142为虚拟机服务ip,不修改java连接会报:
# java.net.ConnectException: Connection refused: no further information
listeners=PLAINTEXT://192.168.217.142:9092修改kafka日志
log.dirs=/opt/kafka/kafka_2.12-2.5.0/logs
bin/kafka-server-start.sh config/server.properties
查看是否启动成功
ps ef|grep kafka
查看进程
jps -l
firewall-cmd --state 如果是running的话
关闭防火墙(centos7关闭firewalld)
systemctl stop firewalld.service
Windows使用telnet测试端口能否访问
telnet 192.168.217.142 9092 如果可以进入界面就说明可以访问
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>spring:
kafka:
bootstrap-servers: localhost:9092
producer:
retries: 0
batch-size: 16384package com.qiming.kafka.chapter1;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerFastStart {
// 主题名称-之前已经创建
private static final String topic = "heima-per";
// Kafka集群地址
private static final String brokerList = "192.168.217.142:9092";
public static void main(String[] args) {
Properties properties = new Properties();
// 设置key序列化器
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//设置重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,10);
// 设置值序列化器
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//设置集群地址
properties.put("bootstrap.servers", brokerList);
KafkaProducer<String,String> producer = new KafkaProducer<String,String>(properties);
ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>(topic,"kafka-demo","hello.kafka 23!!");
try {
producer.send(producerRecord);
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}package com.qiming.kafka.chapter1;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* Kafka 消息消费者
*/
public class ConsumerFastStart {
// Kafka集群地址
private static final String brokerList = "192.168.217.142:9092";
// 主题名称-之前已经创建
private static final String topic = "heima-per";
// 消费组
private static
final String groupId = "group.demo";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put("bootstrap.servers", brokerList);
properties.put("group.id", groupId);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}启动消费者,不断监听,再启动消息生产者,可以看到消费者日志打出
hello.kafka 23!!
注意:一定要先启动kafka
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。