如果zookeeper配置了istio sidecar ,在选举阶段就会报connection refused(Connection refused)
错误,如下图:
这主要是因为 zookeeper 在server之间通信默认是监听 pod IP 地址,而istio要求监听0.0.0.0
,因此需要设置quorumListenOnAllIPs=true
。
具体问题可以参考:https://istio.io/latest/faq/applications/
这个不止在 zookeeper 中会出现,包括 Apache NiFi 、 Cassandra、 Elasticsearch、Redis 中安装 sidecar 模式都会存在这个问题。
由于docker官方的zookeeper镜像没有提供 quorumListenOnAllIPs
的参数,我们需要直接手动添加,详细参考这个issue:
https://github.com/31z4/zookeeper-docker/issues/117
或者可以用 bitnami/zookeeper
这个镜像,这个镜像提供了 quorumListenOnAllIPs
支持,可以通过设置ZOO_LISTEN_ALLIPS_ENABLED
环境变量来控制,下面是简单的deployment文件:
kind: Deployment
apiVersion: apps/v1
metadata:
name: zookeeper-1
namespace: rcmd
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper-1
template:
metadata:
labels:
app: zookeeper-1
spec:
containers:
- name: zookeeper
image: bitnami/zookeeper:3.6.2
imagePullPolicy: Always
ports:
- containerPort: 2181
env:
- name: ALLOW_ANONYMOUS_LOGIN
value: "yes"
- name: ZOO_LISTEN_ALLIPS_ENABLED
value: "true"
- name: ZOO_SERVER_ID
value: "1"
- name: ZOO_SERVERS
value: 0.0.0.0:2888:3888,zookeeper-2:2888:3888,zookeeper-3:2888:3888
---
kind: Deployment
apiVersion: apps/v1
metadata:
name: zookeeper-2
namespace: rcmd
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper-2
template:
metadata:
labels:
app: zookeeper-2
spec:
containers:
- name: zookeeper
image: bitnami/zookeeper:3.6.2
imagePullPolicy: Always
ports:
- containerPort: 2181
env:
- name: ALLOW_ANONYMOUS_LOGIN
value: "yes"
- name: ZOO_LISTEN_ALLIPS_ENABLED
value: "true"
- name: ZOO_SERVER_ID
value: "2"
- name: ZOO_SERVERS
value: zookeeper-1:2888:3888,0.0.0.0:2888:3888,zookeeper-3:2888:3888
---
kind: Deployment
apiVersion: apps/v1
metadata:
name: zookeeper-3
namespace: rcmd
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper-3
template:
metadata:
labels:
app: zookeeper-3
spec:
containers:
- name: zookeeper
image: bitnami/zookeeper:3.6.2
imagePullPolicy: Always
ports:
- containerPort: 2181
env:
- name: ALLOW_ANONYMOUS_LOGIN
value: "yes"
- name: ZOO_LISTEN_ALLIPS_ENABLED
value: "true"
- name: ZOO_SERVER_ID
value: "3"
- name: ZOO_SERVERS
value: zookeeper-1:2888:3888,zookeeper-2:2888:3888,0.0.0.0:2888:3888
再设置好 services 就可以 running 了。
k8s 对外暴露一般都会走ingress,但kafka由于起自身特殊的connect机制,我们需要专门设置kafka让其客户端感知到其目标连接。
kafka 和客户端建立连接:
由于 kafka 会主要告诉客户端 broker 的连接地址,因为在对外网开放的时候我们需要把 broker 地址设置成外网可访问的地址,这里以wurstmeister/kafka
的kafka为例,可以通过以下设置让外网访问:
kind: Deployment
apiVersion: apps/v1
metadata:
name: kafka-broker0
namespace: databases
spec:
replicas: 1
selector:
matchLabels:
app: kafka
id: "kafka-broker0"
template:
metadata:
labels:
app: kafka
id: "kafka-broker0"
spec:
containers:
- name: kafka
image: "wurstmeister/kafka:2.12-2.5.0"
imagePullPolicy: "IfNotPresent"
env:
- name: KAFKA_ADVERTISED_LISTENERS
value: "INSIDE://kafka-broker0:9092,OUTSIDE://kafka.db.tensorbytes.com:10000"
- name: KAFKA_LISTENERS
value: "INSIDE://:9092,OUTSIDE://:10000"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "INSIDE"
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper-1:2181
- name: KAFKA_BROKER_ID
value: "0"
- name: KAFKA_CREATE_TOPICS
value: mp_post_slog:1:1
- name: LOG4J_LOGGER_KAFKA_AUTHORIZER_LOGGER
value: "DEBUG"
resources:
limits:
cpu: 200m
memory: 512Mi
环境变量:
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
istio virtualserver 配置文件:
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: kafka-router
namespace: databases
spec:
gateways:
- db-external-gateway
hosts:
- kafka.db.tensorbytes.com
tcp:
- match:
- port: 10000
route:
- destination:
host: kafka-broker0.databases.svc.cluster.local
port:
number: 10000
---
apiVersion: v1
kind: Service
metadata:
name: kafka-broker0
labels:
name: kafka
namespace: rcmd
spec:
ports:
- port: 9092
name: internal-port
protocol: TCP
targetPort: 9092
- port: 10000
name: external-port
protocol: TCP
targetPort: 10000
selector:
app: kafka
id: "kafka-broker0"
type: ClusterIP
生产者:
#coding:utf-8
from kafka import KafkaProducer
import random
producer = KafkaProducer(bootstrap_servers='kafka.db.tensorbytes.com:10000')
for i in range(10):
producer.send('mp_post_slog', key=b'testping', value=b'bar')
producer.flush(timeout=10)
消费者:
#coding:utf-8
from kafka import KafkaConsumer
consumer = KafkaConsumer('mp_post_slog', bootstrap_servers='kafka.db.tensorbytes.com:10000', group_id='my_favorite_group')
for msg in consumer:
print(msg)
生产者:
#coding:utf-8
from kafka import KafkaProducer
import random
producer = KafkaProducer(bootstrap_servers='kafka-broker0:9092')
for i in range(10):
producer.send('mp_post_slog', key=b'testping', value=b'bar')
producer.flush(timeout=10)
消费者:
#coding:utf-8
from kafka import KafkaProducer
import random
producer = KafkaProducer(bootstrap_servers='kafka-broker0:9092')
for i in range(10):
producer.send('mp_post_slog', key=b'testping', value=b'bar')
producer.flush(timeout=10)