前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用Docker(k8s)安装Kafka并使用宿主机连接

使用Docker(k8s)安装Kafka并使用宿主机连接

作者头像
全栈程序员站长
发布2022-09-25 10:58:05
1.1K0
发布2022-09-25 10:58:05
举报
文章被收录于专栏:全栈程序员必看

大家好,又见面了,我是你们的朋友全栈君。

使用Docker(k8s)安装Kafka并使用宿主机连接

  1. 安装Docker及docker-compose 具体安装方法可以去官网看教程 检查docker-compose是否安装成功
  1. 创建 docker-compose.yml 文件
代码语言:javascript
复制
version: '2'
services:
  zookeeper:
    image: "zookeeper"
    hostname: "zookeeper.local"
    container_name: "zookeeper"
    #设置网络别名 可随意取
    networks:
      local:
        aliases:
          - "zookeeper.local"
  kafka:
    image: "wurstmeister/kafka"
    hostname: "kafka.local"
    container_name: "kafka"
    ports:
      - "9092:9092"
    networks:
      local:
        aliases:
          - "kafka.local"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka.local
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
#设置网络,名为local模式
networks:
  local:
    driver: bridge
  1. 进入Kafka容器
代码语言:javascript
复制
docker exec -it kafka /bin/bash
  1. 创建Topic
代码语言:javascript
复制
/opt/kafka_2.13-2.7.0/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 -replication-factor 1 --partitions 1 --topic test_topic
  1. 测试命令行生产消费
  • 生产者
代码语言:javascript
复制
/opt/kafka_2.13-2.7.0/bin/kafka-console-producer.sh --broker-list kafka.local:9092 --topic test_topic
  • 消费者
代码语言:javascript
复制
/opt/kafka_2.13-2.7.0/bin/kafka-console-consumer.sh --bootstrap-server kafka.local:9092 --topic test_top
ic --from-beginning

从宿主机使用代码连接Kafka 6.1 进入Zookeeper容器查看brokers注册信息

代码语言:javascript
复制
# 进入容器
docker exec -it zookeeper /bin/bash

# 进入zookeeper命令行
bin/zkCli.sh

6.2 查看brokers注册信息

代码语言:javascript
复制
get /brokers/ids/1001

6.3 配置宿主机hosts

代码语言:javascript
复制
# 添加
127.0.0.1	kafka.local

6.4 使用Java代码连接Kafka

代码语言:javascript
复制
public class KafkaConsumerDemo { 
     
public static void main(String[] args) { 
     
	//1.配置消费者连接属性
	Properties props = new Properties();
	props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.local:9092");
	props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
	props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
	props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_demo");


	//2.创建Kafka消费者
	KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

	//3.订阅topics
	consumer.subscribe(Arrays.asList("test_topic"));
// consumer.assign(Arrays.asList(new TopicPartition("test_topic", 0)));
// consumer.seekToBeginning(Arrays.asList(new TopicPartition("test_topic", 0)));//不改变当前offset

	//4.死循环读取消息
	for (; ; ) { 
     
		ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

		if (records != null && !records.isEmpty()) { 
     
			records.forEach(r -> { 
     
				System.out.println("key:" + r.key() + "----value:" + r.value());
			});
		}
	}
  }
}

测试成功

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/171579.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 使用Docker(k8s)安装Kafka并使用宿主机连接
相关产品与服务
容器镜像服务
容器镜像服务(Tencent Container Registry,TCR)为您提供安全独享、高性能的容器镜像托管分发服务。您可同时在全球多个地域创建独享实例,以实现容器镜像的就近拉取,降低拉取时间,节约带宽成本。TCR 提供细颗粒度的权限管理及访问控制,保障您的数据安全。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档