2024 年云原生运维实战文档 99 篇原创计划 第 057 篇 |KubeSphere 最佳实战「2024」系列 第 029 篇
你好,欢迎来到运维有术。
今天分享的主题是:不使用 Helm、Operator,如何在 Kubernetes 集群上手工部署一个开启 SASL 认证的 KRaft 模式的 Kafka 集群?
本文,我将为您提供一份全面的实战指南,逐步引导您完成以下关键任务:
通过本文的指导,您将掌握在 Kubernetes 上部署 KRaft 模式 Kafka 集群的必备技能。
实战服务器配置(架构1:1复刻小规模生产环境,配置略有不同)
主机名 | IP | CPU | 内存 | 系统盘 | 数据盘 | 用途 |
---|---|---|---|---|---|---|
ksp-control-1 | 192.168.9.121 | 4 | 8 | 40 | 100 | KubeSphere/k8s-control-plane |
ksp-control-2 | 192.168.9.122 | 4 | 8 | 40 | 100 | KubeSphere/k8s-control-plane |
ksp-control-3 | 192.168.9.123 | 4 | 8 | 40 | 100 | KubeSphere/k8s-control-plane |
ksp-worker-1 | 192.168.9.124 | 8 | 16 | 40 | 100 | k8s-worker/CI |
ksp-worker-2 | 192.168.9.125 | 8 | 16 | 40 | 100 | k8s-worker |
ksp-worker-3 | 192.168.9.126 | 8 | 16 | 40 | 100 | k8s-worker |
ksp-storage | 192.168.9.127 | 2 | 4 | 40 | 100 | NFS Storage |
合计 | 3 | 38 | 76 | 280 | 700 |
实战环境涉及软件版本信息
目前在 k8s 集群部署 kafka 的主流方案有以下几种:
往期我们实战演练过,如何使用 Helm 部署 Kafka 集群,具体内容可以参考KubeSphere 部署 Kafka 集群实战指南。本文我们使用手撸资源配置清单的方式部署 Kafka 集群。
资源配置清单规划如下:
本实战环境使用 NFS 作为 k8s 集群的持久化存储,新集群可以参考探索 Kubernetes 持久化存储之 NFS 终极实战指南 部署 NFS 存储。
Kafka 集群所有资源部署在命名空间 opsxlab
内。
明文密码必须使用 base64 加密,echo -n "PleaseChangeMe" | base64 -w0
,生产环境请生成不同的密码。
请使用 vi
编辑器,创建资源清单文件 kafka-sasl-passwords-secret.yaml
,并输入以下内容:
kind: Secret
apiVersion: v1
metadata:
name: kafka-sasl-passwords
labels:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka
data:
client-passwords: UGxlYXNlQ2hhbmdlTWU=
controller-password: UGxlYXNlQ2hhbmdlTWU=
inter-broker-password: UGxlYXNlQ2hhbmdlTWU=
type: Opaque
使用下面的命令,创建一个临时 Pod,生成 UUID 后自动删除。
$ kubectl run app-kafka-client --rm -i --image registry.opsxlab.cn:8443/bitnami/kafka:3.6.2 -n opsxlab -- /opt/bitnami/kafka/bin/kafka-storage.sh random-uuid
RpOTPIfMRTiPpmCYJHF9KQ
将生成的明文 UUID 使用 base64 加密,echo -n "RpOTPIfMRTiPpmCYJHF9KQ" | base64 -w0
。
请使用 vi
编辑器,创建资源清单文件 kafka-kraft-cluster-id.yaml
,并输入以下内容:
kind: Secret
apiVersion: v1
metadata:
name: kafka-kraft-cluster-id
labels:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka
data:
kraft-cluster-id: UnBPVFBJZk1SVGlQcG1DWUpIRjlLUQ==
type: Opaque
执行下面的命令,创建资源。
kubectl apply -f kafka-sasl-passwords-secret.yaml -n opsxlab
kubectl apply -f kafka-kraft-cluster-id.yaml -n opsxlab
执行下面的命令,查看创建结果。
$ kubectl get secret -n opsxlab
NAME TYPE DATA AGE
kafka-kraft-cluster-id Opaque 1 5s
kafka-sasl-passwords Opaque 3 6s
服务规划说明:
请使用 vi
编辑器,创建资源清单文件 kafka-controller-headless.yaml
,并输入以下内容:
kind: Service
apiVersion: v1
metadata:
name: kafka-controller-hs
labels:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka
spec:
ports:
- name: tcp-internal
protocol: TCP
port: 9092
targetPort: internal
- name: tcp-controller
protocol: TCP
port: 9093
targetPort: controller
selector:
app.kubernetes.io/instance: app-kafka
clusterIP: None
type: ClusterIP
请使用 vi
编辑器,创建资源清单文件 kafka-controller-0-external.yaml
,并输入以下内容:
kind: Service
apiVersion: v1
metadata:
name: kafka-controller-0-external
labels:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka
spec:
ports:
- name: tcp-external
protocol: TCP
port: 9094
targetPort: 9094
nodePort: 31211
selector:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka-controller-0
type: NodePort
请使用 vi
编辑器,创建资源清单文件 kafka-controller-1-external.yaml
,并输入以下内容:
kind: Service
apiVersion: v1
metadata:
name: kafka-controller-1-external
labels:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka
spec:
ports:
- name: tcp-external
protocol: TCP
port: 9094
targetPort: 9094
nodePort: 31212
selector:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka-controller-1
type: NodePort
请使用 vi
编辑器,创建资源清单文件 kafka-controller-2-external.yaml
,并输入以下内容:
kind: Service
apiVersion: v1
metadata:
name: kafka-controller-2-external
labels:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka
spec:
ports:
- name: tcp-external
protocol: TCP
port: 9094
targetPort: 9094
nodePort: 31213
selector:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka-controller-2
type: NodePort
执行下面的命令,创建资源。
kubectl apply -f kafka-controller-headless.yaml -n opsxlab
kubectl apply -f kafka-controller-0-external.yaml -n opsxlab
kubectl apply -f kafka-controller-1-external.yaml -n opsxlab
kubectl apply -f kafka-controller-2-external.yaml -n opsxlab
执行下面的命令,查看创建结果。
$ kubectl get svc -n opsxlab
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kafka-controller-0-external NodePort 10.233.1.92 <none> 9094:31211/TCP 8s
kafka-controller-1-external NodePort 10.233.18.62 <none> 9094:31212/TCP 8s
kafka-controller-2-external NodePort 10.233.38.37 <none> 9094:31213/TCP 8s
kafka-controller-hs ClusterIP None <none> 9092/TCP,9093/TCP 8s
使用 StatefulSet 部署 Kafka 集群,3个 Kafka 节点使用内容大部分相同的配置文件,必须修改的参数如下:
请使用 vi
编辑器,创建资源清单文件 kafka-controller-0-sts.yaml
,并输入以下内容:
kind: StatefulSet
apiVersion: apps/v1
metadata:
name: kafka-controller-0
labels:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka-controller-0
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka-controller-0
template:
metadata:
labels:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka-controller-0
spec:
containers:
- name: kafka
image: 'registry.opsxlab.cn:8443/bitnami/kafka:3.6.2'
ports:
- name: intelrnal
containerPort: 9092
protocol: TCP
- name: controller
containerPort: 9093
protocol: TCP
- name: external
containerPort: 9094
protocol: TCP
env:
- name: BITNAMI_DEBUG
value: 'false'
- name: HOST_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.hostIP
- name: KAFKA_HEAP_OPTS
value: '-Xmx2048m -Xms1024m'
- name: KAFKA_KRAFT_CLUSTER_ID
valueFrom:
secretKeyRef:
name: kafka-kraft-cluster-id
key: kraft-cluster-id
- name: KAFKA_CLIENT_USERS
value: user1
- name: KAFKA_CLIENT_PASSWORDS
valueFrom:
secretKeyRef:
name: kafka-sasl-passwords
key: client-passwords
- name: KAFKA_INTER_BROKER_USER
value: inter_broker_user
- name: KAFKA_INTER_BROKER_PASSWORD
valueFrom:
secretKeyRef:
name: kafka-sasl-passwords
key: inter-broker-password
- name: KAFKA_CONTROLLER_USER
value: controller_user
- name: KAFKA_CONTROLLER_PASSWORD
valueFrom:
secretKeyRef:
name: kafka-sasl-passwords
key: controller-password
- name: KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL
value: PLAIN
- name: KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL
value: PLAIN
- name: KAFKA_CFG_NODE_ID
value: '0'
- name: KAFKA_CFG_PROCESS_ROLES
value: 'controller,broker'
- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
value: >-
0@kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093,1@kafka-controller-1-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093,2@kafka-controller-2-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093
- name: KAFKA_CFG_LISTENERS
value: 'INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9094'
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: >-
INTERNAL://kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092,EXTERNAL://192.168.9.121:31211
- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
value: >-
INTERNAL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
value: CONTROLLER
- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
value: INTERNAL
- name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR
value: '3'
- name: KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: '3'
- name: KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR
value: '2'
resources:
limits:
cpu: '1'
memory: 2Gi
requests:
cpu: 50m
memory: 512Mi
volumeMounts:
- name: data
mountPath: /bitnami/kafka
livenessProbe:
exec:
command:
- pgrep
- '-f'
- kafka
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 10
successThreshold: 1
failureThreshold: 3
readinessProbe:
tcpSocket:
port: controller
initialDelaySeconds: 5
timeoutSeconds: 5
periodSeconds: 10
successThreshold: 1
failureThreshold: 6
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
imagePullPolicy: IfNotPresent
restartPolicy: Always
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchLabels:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka
topologyKey: kubernetes.io/hostname
volumeClaimTemplates:
- kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: data
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
storageClassName: nfs-sc
volumeMode: Filesystem
serviceName: kafka-controller-hs
请使用 vi
编辑器,创建资源清单文件 kafka-controller-1-sts.yaml
,并输入以下内容:
kind: StatefulSet
apiVersion: apps/v1
metadata:
name: kafka-controller-1
labels:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka-controller-1
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka-controller-1
template:
metadata:
labels:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka-controller-1
spec:
containers:
- name: kafka
image: 'registry.opsxlab.cn:8443/bitnami/kafka:3.6.2'
ports:
- name: intelrnal
containerPort: 9092
protocol: TCP
- name: controller
containerPort: 9093
protocol: TCP
- name: external
containerPort: 9094
protocol: TCP
env:
- name: BITNAMI_DEBUG
value: 'false'
- name: HOST_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.hostIP
- name: KAFKA_HEAP_OPTS
value: '-Xmx2048m -Xms1024m'
- name: KAFKA_KRAFT_CLUSTER_ID
valueFrom:
secretKeyRef:
name: kafka-kraft-cluster-id
key: kraft-cluster-id
- name: KAFKA_CLIENT_USERS
value: user1
- name: KAFKA_CLIENT_PASSWORDS
valueFrom:
secretKeyRef:
name: kafka-sasl-passwords
key: client-passwords
- name: KAFKA_INTER_BROKER_USER
value: inter_broker_user
- name: KAFKA_INTER_BROKER_PASSWORD
valueFrom:
secretKeyRef:
name: kafka-sasl-passwords
key: inter-broker-password
- name: KAFKA_CONTROLLER_USER
value: controller_user
- name: KAFKA_CONTROLLER_PASSWORD
valueFrom:
secretKeyRef:
name: kafka-sasl-passwords
key: controller-password
- name: KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL
value: PLAIN
- name: KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL
value: PLAIN
- name: KAFKA_CFG_NODE_ID
value: '1'
- name: KAFKA_CFG_PROCESS_ROLES
value: 'controller,broker'
- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
value: >-
0@kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093,1@kafka-controller-1-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093,2@kafka-controller-2-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093
- name: KAFKA_CFG_LISTENERS
value: 'INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9094'
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: >-
INTERNAL://kafka-controller-1-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092,EXTERNAL://192.168.9.121:31212
- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
value: >-
INTERNAL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
value: CONTROLLER
- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
value: INTERNAL
- name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR
value: '3'
- name: KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: '3'
- name: KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR
value: '2'
resources:
limits:
cpu: '1'
memory: 2Gi
requests:
cpu: 50m
memory: 512Mi
volumeMounts:
- name: data
mountPath: /bitnami/kafka
livenessProbe:
exec:
command:
- pgrep
- '-f'
- kafka
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 10
successThreshold: 1
failureThreshold: 3
readinessProbe:
tcpSocket:
port: controller
initialDelaySeconds: 5
timeoutSeconds: 5
periodSeconds: 10
successThreshold: 1
failureThreshold: 6
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
imagePullPolicy: IfNotPresent
restartPolicy: Always
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchLabels:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka
topologyKey: kubernetes.io/hostname
volumeClaimTemplates:
- kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: data
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
storageClassName: nfs-sc
volumeMode: Filesystem
serviceName: kafka-controller-hs
请使用 vi
编辑器,创建资源清单文件 kafka-controller-2-sts.yaml
,并输入以下内容:
kind: StatefulSet
apiVersion: apps/v1
metadata:
name: kafka-controller-2
labels:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka-controller-2
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka-controller-2
template:
metadata:
labels:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka-controller-2
spec:
containers:
- name: kafka
image: 'registry.opsxlab.cn:8443/bitnami/kafka:3.6.2'
ports:
- name: intelrnal
containerPort: 9092
protocol: TCP
- name: controller
containerPort: 9093
protocol: TCP
- name: external
containerPort: 9094
protocol: TCP
env:
- name: BITNAMI_DEBUG
value: 'false'
- name: HOST_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.hostIP
- name: KAFKA_HEAP_OPTS
value: '-Xmx2048m -Xms1024m'
- name: KAFKA_KRAFT_CLUSTER_ID
valueFrom:
secretKeyRef:
name: kafka-kraft-cluster-id
key: kraft-cluster-id
- name: KAFKA_CLIENT_USERS
value: user1
- name: KAFKA_CLIENT_PASSWORDS
valueFrom:
secretKeyRef:
name: kafka-sasl-passwords
key: client-passwords
- name: KAFKA_INTER_BROKER_USER
value: inter_broker_user
- name: KAFKA_INTER_BROKER_PASSWORD
valueFrom:
secretKeyRef:
name: kafka-sasl-passwords
key: inter-broker-password
- name: KAFKA_CONTROLLER_USER
value: controller_user
- name: KAFKA_CONTROLLER_PASSWORD
valueFrom:
secretKeyRef:
name: kafka-sasl-passwords
key: controller-password
- name: KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL
value: PLAIN
- name: KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL
value: PLAIN
- name: KAFKA_CFG_NODE_ID
value: '2'
- name: KAFKA_CFG_PROCESS_ROLES
value: 'controller,broker'
- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
value: >-
0@kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093,1@kafka-controller-1-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093,2@kafka-controller-2-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093
- name: KAFKA_CFG_LISTENERS
value: 'INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9094'
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: >-
INTERNAL://kafka-controller-2-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092,EXTERNAL://192.168.9.121:31213
- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
value: >-
INTERNAL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
value: CONTROLLER
- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME
value: INTERNAL
- name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR
value: '3'
- name: KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: '3'
- name: KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR
value: '2'
resources:
limits:
cpu: '1'
memory: 2Gi
requests:
cpu: 50m
memory: 512Mi
volumeMounts:
- name: data
mountPath: /bitnami/kafka
livenessProbe:
exec:
command:
- pgrep
- '-f'
- kafka
initialDelaySeconds: 10
timeoutSeconds: 5
periodSeconds: 10
successThreshold: 1
failureThreshold: 3
readinessProbe:
tcpSocket:
port: controller
initialDelaySeconds: 5
timeoutSeconds: 5
periodSeconds: 10
successThreshold: 1
failureThreshold: 6
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
imagePullPolicy: IfNotPresent
restartPolicy: Always
terminationGracePeriodSeconds: 30
dnsPolicy: ClusterFirst
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchLabels:
app.kubernetes.io/instance: app-kafka
app.kubernetes.io/name: kafka
topologyKey: kubernetes.io/hostname
volumeClaimTemplates:
- kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: data
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
storageClassName: nfs-sc
volumeMode: Filesystem
serviceName: kafka-controller-hs
执行下面的命令,创建资源。
kubectl apply -f kafka-controller-0-sts.yaml -n opsxlab
kubectl apply -f kafka-controller-1-sts.yaml -n opsxlab
kubectl apply -f kafka-controller-2-sts.yaml -n opsxlab
执行下面的命令,查看创建结果(初次创建比较慢)。
$ kubectl get sts,pod -n opsxlab
NAME READY AGE
statefulset.apps/kafka-controller-0 1/1 25s
statefulset.apps/kafka-controller-1 1/1 25s
statefulset.apps/kafka-controller-2 1/1 24s
NAME READY STATUS RESTARTS AGE
pod/kafka-controller-0-0 1/1 Running 0 24s
pod/kafka-controller-1-0 1/1 Running 0 24s
pod/kafka-controller-2-0 1/1 Running 0 23s
分别在 k8s 集群内和集群外验证 Kafka 服务的可用性。
kubectl run opsxlab-kafka-client --restart='Never' --image registry.opsxlab.cn:8443/bitnami/kafka:3.6.2 --namespace opsxlab --command -- sleep infinity
cat << EOF > /tmp/client.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="PleaseChangeMe";
EOF
app-kafka-client
内部kubectl cp --namespace opsxlab /tmp/client.properties opsxlab-kafka-client:/tmp/client.properties
kubectl exec --tty -i opsxlab-kafka-client --namespace opsxlab -- bash
kafka-topics.sh --bootstrap-server kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 --create --topic test-topic --partitions 3 --replication-factor 3 --command-config /tmp/client.properties
$ kafka-topics.sh --bootstrap-server kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 --topic test-topic --describe --command-config /tmp/client.properties
Topic: test-topic TopicId: yNWQQ6yKSBeLmvVUFf2IVw PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: test-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: test-topic Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: test-topic Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
kafka-console-producer.sh \
--broker-list kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 \
--topic test-topic --producer.config /tmp/client.properties
再打开一个终端,然后再执行下面的命令。
kafka-console-consumer.sh \
--bootstrap-server kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 \
--topic test-topic \
--from-beginning --consumer.config /tmp/client.properties
在生产者一侧随便输入测试数据,观察消费者一侧是否正确收到信息。
生产者侧:
I have no name!@opsxlab-kafka-client:/$ kafka-console-producer.sh \
--broker-list kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 \
--topic test-topic --producer.config /tmp/client.properties
>cluster kafka test 1
>cluster kafka test 2
>cluster kafka test 3
消费者侧:
I have no name!@opsxlab-kafka-client:/$ kafka-console-consumer.sh \
--bootstrap-server kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 \
--topic test-topic \
--from-beginning --consumer.config /tmp/client.properties
cluster kafka test 1
cluster kafka test 2
cluster kafka test 3
为了更严谨的测试 Kafka 在 k8s 集群外的可用性,我在 k8s 集群外找了一台机器,安装 JDK 和 Kafka。安装方式上 JDK 选择了 Yum 安装 openjdk
,Kafka 则选用了官方提供的3.9.0
最新版本的二进制包。
实际测试时还可以选择 Docker 镜像或是在 k8s 集群上再创建一个 Pod,测试时连接 k8s 节点的宿主机 IP 和 NodePort。
# 安装 JDK
yum install java-1.8.0-openjdk
# 下载 Kafka
cd /srv
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.9.0.tgz
# 解压
tar xvf kafka_2.13-3.9.0.tgz
cd /srv/kafka_2.13-3.9.0/bin
本文使用一个 Master 节点,作为 Kafka NodePort 的 IP,实际使用中建议使用多个 Worker 节点,每个 Pod 对应一个 Worker节点IP。
下面测试的 Broker Server 地址使用 192.168.9.121:31211
cat << EOF > /tmp/client.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="PleaseChangeMe";
EOF
跟 k8s 集群内部验证测试过程一样,打开两个终端,运行生产者和消费者脚本。执行下面的命令验证测试(细节略过,直接上结果)。
外部生产者侧:
$ ./kafka-console-producer.sh --broker-list 192.168.9.121:31211 --topic test-topic --producer.config /tmp/client.properties
>external kafka test 10
>external kafka test 20
>external kafka test 30
外部消费者侧:
$ ./kafka-console-consumer.sh --bootstrap-server 192.168.9.121:31211 --topic test-topic --from-beginning --consumer.config /tmp/client.properties
external kafka test 10
external kafka test 20
external kafka test 30
cluster kafka test 1
cluster kafka test 2
cluster kafka test 3
注意: k8s 集群外部消费者能消费到所有数据,包括集群内部测试时生成的数据。
集群内消费者侧: 集群内的消费者,同样能获取外部生产者产生的数据。
I have no name!@opsxlab-kafka-client:/$ kafka-console-consumer.sh \
--bootstrap-server kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 \
--topic test-topic \
--from-beginning --consumer.config /tmp/client.properties
cluster kafka test 1
cluster kafka test 2
cluster kafka test 3
external kafka test 10
external kafka test 20
external kafka test 30
./kafka-topics.sh --bootstrap-server 192.168.9.121:31211 --delete --topic test-topic --command-config /tmp/client.properties
./kafka-topics.sh --bootstrap-server 192.168.9.121:31211 --list --command-config /tmp/client.properties
以上,就是我今天分享的全部内容。下一期分享的内容还没想好,敬请期待开盲盒。
如果你喜欢本文,请分享、收藏、点赞、评论! 请持续关注 @运维有术,及时收看更多好文!
欢迎加入 「知识星球|运维有术」 ,获取更多的 KubeSphere、Kubernetes、云原生运维、自动化运维、AI 大模型等实战技能。未来运维生涯始终有我坐在你的副驾。
免责声明:
Get 本文实战视频(请注意,文档视频异步发行,请先关注)
版权声明
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。