我正在尝试部署两个Spring引导应用程序(卡夫卡生产者和消费者)。当我将生产者部署到坞时,一切正常,但是当我部署时,我的使用者就不能工作了,因为它与kafka容器没有连接。
日志显示了这个错误
2019-11-17 05:32:22.644 WARN 1 --- [main] o.a.k.c.NetworkClient: [Consumer clientId=consumer-1, groupId=exampleGroup] Connection to node -1 could not be established. Broker may not be available.
我的船坞-合成人
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
restart: always
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
restart: always
ports:
- 9092:9092
depends_on:
- zookeeper
links:
- zookeeper:zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "topic1:1:1"
在我的KafkaConfig课上:
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKERS);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "exampleGroup");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, KafkaConstants.ENABLE_AUTO_COMMIT_CONFIG);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KafkaConstants.OFFSET_RESET_EARLIER);
// config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, KafkaConstants.SESSION_TIMEOUT_MS);
return new DefaultKafkaConsumerFactory<>(config);
}
和常量类
public class KafkaConstants {
public static String KAFKA_BROKERS = "localhost:9092";
public static Integer MESSAGE_COUNT=1000;
public static String TOPIC_NAME="demo";
public static String GROUP_ID_CONFIG="exampleGroup";
public static Integer MAX_NO_MESSAGE_FOUND_COUNT=100;
public static String OFFSET_RESET_LATEST="latest";
public static String OFFSET_RESET_EARLIER="earliest";
public static Integer MAX_POLL_RECORDS=1;
public static Integer SESSION_TIMEOUT_MS = 180000;
public static Integer REQUEST_TIMEOUT_MS_CONFIG = 181000;
public static String ENABLE_AUTO_COMMIT_CONFIG = "false";
public static Integer AUTO_COMMIT_INTERVAL_MS_CONFIG = 8000;
}
当我在电脑上安装zookepper和kafka并使用intellij运行这两个春季引导应用程序时,效果很好。问题是当我部署到我的本地码头时。
你能帮帮我吗?
更新
更新我的对接者-撰写:
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
restart: always
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
restart: always
ports:
- 9092:9092
depends_on:
- zookeeper
links:
- zookeeper:zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "ACC_GROUP_CREATE:1:1"
consumer:
image: micro1
container_name: micro1
depends_on:
- kafka
restart: always
ports:
- 8088:8088
depends_on:
- kafka
links:
- kafka:kafka
producer:
image: micro2
container_name: micro2
depends_on:
- kafka
restart: always
ports:
- 8087:8087
depends_on:
- kafka
links:
- kafka:kafka
工作得很好!基于@hqt的反应,但我不知道为什么要添加这些行的消费者/生产者
发布于 2019-11-17 12:41:18
这个问题是由于KAFKA_ADVERTISED_HOST_NAME
属性造成的。这是文档,它解释了为什么卡夫卡需要广告地址。
关键是,当您运行一个客户端时,您传递给它的代理就是从集群中获取有关代理的元数据的地方。它将连接到的用于读取/写入数据的实际主机& IP是基于代理在初始连接中传回的数据的--即使它只是一个节点,并且返回的代理与连接到的节点相同。
将KAFKA_ADVERTISED_HOST_NAME
设置为localhost时:
在容器环境中运行web应用程序时,将KAFKA_ADVERTISED_HOST_NAME
属性更新为kafka
将有效。请注意,您的网络应用程序和卡夫卡容器必须在同一个码头的网络。
下面是使用Wurstmeister的图像来运行Kafka集群的拟议的停靠组合。
version: "2"
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- 9092:9092
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: 9092
KAFKA_CREATE_TOPICS: "topic1:1:1"
web_app:
# your definition of the web_app goes here
然后,您可以连接到容器环境中地址kafka:9092
上的Kafka代理。
发布于 2019-11-17 13:54:55
这是一个常见的问题,您需要阅读和理解的权威文档是https://www.confluent.io/blog/kafka-listeners-explained。
我正在复制它的tl;dr:供参考:
您需要将advertised.listeners (如果使用Docker >映像)设置为外部地址(主机/IP),以便客户端>能够正确地连接到它。否则,他们将尝试连接>到内部主机地址--如果无法到达,则会出现问题。“
https://stackoverflow.com/questions/58897876
复制