笔记 | 本实验假设从边缘到流处理已完成。如果您还没有,请让您的讲师为您设置集群状态。 |
---|
在本次实验中,您将使用 Streams Replication Manager (SRM) 跨集群复制 Kafka 主题。
本实验中的实验将需要两个集群,以便我们可以在它们之间配置复制。如果您的讲师为您分配了两个集群,您可以自己执行所有配置。否则,请与另一个参加过的实验配对,并共同配置您的各个集群之间的复制。
我们将在整个实验中将这两个集群称为集群 A和集群 B ,并将在整个练习中分别使用别名cluster_a和cluster_b,可以更改这些别名以满足您的需要/偏好(例如,您可以使用nycand和paris,只要您在所有练习中保持一致性)。
虽然一个 SRM 集群可以双向复制,但我们将为这种类型的复制实施最佳实践,其中包括远程读取和本地写入。因此,集群 A 中的 SRM 会将消息从集群 B 复制到集群 A,反之亦然。
一些实验必须在两个集群上执行,而其他实验只适用于一个特定的集群。在每个实验开始时,我们将指定它们适用于哪些集群。
在这一系列实验中,我们将安装和配置 Streams Replication Manager (SRM) 服务,以在两个 Kafka 集群之间复制数据和配置。
Streams Replication Manager 由三个角色组成:
我们还将了解如何配置 Streams Messaging Manager (SMM) 服务来监控在两个集群之间配置的复制。
我们的目标是在实验结束时实现以下双向复制架构:
笔记 | 在两个集群 上运行 |
---|
Streams Replication Manager (SRM) 可以安装在由 Cloudera Manager 管理的现有集群上。为此,您需要使用 Kafka 凭据定义外部集群,将 SRM 添加到集群,并配置与集群、复制和角色目标相关的许多必需属性。
Kafka Credentials选项卡使您能够创建、配置和管理 Kafka 凭证。Kafka 凭证是包含 SRM 与集群建立连接所需的连接属性的项目。您可以将 Kafka 凭证视为单个集群的定义。在此选项卡上,您将为参与复制过程的每个外部集群创建一个凭证。
外部集群的安全配置决定了您必须设置哪些可用属性。至少必须设置以下属性:
Cluster A配置:
Name=cluster_b
Bootstrap servers=10.0.211.178:9092
Security protocol=PLAINTEXT
Cluster B配置:
Name=cluster_a
Bootstrap servers=10.0.211.167:9092
Security protocol=PLAINTEXT
笔记 | 在两个集群 上运行 |
---|
笔记 | 将以下值中的CLUSTER_A_FQDN和CLUSTER_B_FQDN占位符分别替换为集群 A 和 B 的完全限定域名。您可以在实验登录页面中为您的集群使用完全限定的主机名。 |
---|
Property | value |
---|---|
External Kafka Accounts | cluster_b |
Streams Replication Manager Cluster alias | cluster_a, cluster_b |
Streams Replication Manager’s Replication Configs(click the “+” button to separately add each property on the right) | cluster_b->cluster_a.enabled=true |
replication.factor=1 | |
heartbeats.topic.replication.factor=1 | |
checkpoints.topic.replication.factor=1 | |
offset-syncs.topic.replication.factor=1 | |
offset.storage.replication.factor=1 | |
config.storage.replication.factor=1 | |
status.storage.replication.factor=1 | |
metrics.topic.replication.factor=1 | |
Streams Replication Manager Driver Target Cluster | cluster_b,cluster_a |
Streams Replication Manager Service Target Cluster | cluster_a |
Streams Replication Manager Co-located Kafka Cluster Alias | cluster_a |
Property | Value |
---|---|
External Kafka Accounts | cluster_a |
Streams Replication Manager Cluster alias | cluster_a, cluster_b |
Streams Replication Manager’s Replication Configs(click the “+” button to separately add each property on the right) | cluster_a->cluster_b.enabled=true |
replication.factor=1 | |
heartbeats.topic.replication.factor=1 | |
checkpoints.topic.replication.factor=1 | |
offset-syncs.topic.replication.factor=1 | |
offset.storage.replication.factor=1 | |
config.storage.replication.factor=1 | |
status.storage.replication.factor=1 | |
metrics.topic.replication.factor=1 | |
Streams Replication Manager Driver Target Cluster | cluster_b,cluster_a |
Streams Replication Manager Service Target Cluster | cluster_b |
Streams Replication Manager Co-located Kafka Cluster Alias | cluster_b |
您现在拥有一个工作的 Streams Replication Manager 服务!
笔记 | 在两个集群 上运行 |
---|
SRM 服务配置了一些通常适用于生产环境的默认刷新间隔。但是,对于我们的实验,我们希望刷新间隔更短,以便我们可以运行测试并快速查看结果。在继续之前,让我们重新配置这些间隔。
Property | Value |
---|---|
Refresh Topics Interval Seconds | 30 seconds |
Refresh Groups Interval Seconds | 30 seconds |
Sync Topic Configs Interval Seconds | 30 seconds |
笔记 | 在两个集群 上运行 |
---|
在本实验中,我们将配置 Streams Messaging Manager (SMM) 来监控两个集群之间的 Kafka 复制。
Property | value |
---|---|
Configure Streams Replication Manager | Checked |
Streams Replication Manager Rest Protocol | http |
)。您应该能够看到两个集群上复制的监控页面:
在集群 A 上:
在集群 B 上:
请注意,到目前为止,只有heartbeats主题被复制。在下一个实验中,我们将为复制添加更多主题。
提示 | 如果复制在任何时间点显示为INACTIVE,请等待几秒钟并刷新屏幕。 |
---|
笔记 | 在步骤说明中指示的集群上运行 |
---|
在本实验中,我们将启用主动-主动复制,将集群 A 中产生的消息复制到集群 B,将集群 B 中产生的消息复制到集群 A。
SRM 有一个主题的白名单和黑名单。仅复制在白名单中但不在黑名单中的主题。管理员可以有选择地控制要复制的主题,但可以管理这些列表。这同样适用于消费者组偏移复制。
Topic Name: global_iot
Partitions: 5
Availability: LOW (*)
Cleanup Policy: delete
(*) 上面的 LOW 可用性设置是必需的,因为我们的集群只有一个节点。
现在,按照以下步骤启用从集群A 到 B 的消息复制。这些步骤应仅在集群 B 中执行。
export SECURESTOREPASS=supersecret1
export security_protocol=PLAINTEXT
sudo -E srm-control topics \
--source cluster_a \
--target cluster_b \
--add "global_iot"
其中:supersecret1为实验3中配置的密码。
运行以下命令确认白名单修改正确:
sudo -E srm-control topics \
--source cluster_a \
--target cluster_b \
--list
您应该看到以下输出:
Current whitelist:
global_iot
Current blacklist:
Done.
提示 | 白名单也可以指定为正则表达式,这在您需要通过模式匹配选择主题时很有帮助。 |
---|
重要提示:确保主题名称中没有前导或尾随空格。
即使统计数据尚未更新,请尝试单击放大镜图标查看数据。即使统计数据尚未刷新,这通常也会立即显示出来。
) 并检查吞吐量和延迟指标,以确保一切都按预期工作。您应该期望吞吐量大于零和毫秒级的延迟。
重要 | 请确保已在两个方向上设置了 SRM 复制,然后再继续进行下一个实验。在故障转移实验中,我们将练习消费者故障转移和故障恢复,这需要双向复制才能正常工作。 |
---|
笔记 | 在步骤说明中指示的集群上运行 |
---|
SRM 的一大特色是它能够将消费者组偏移量从一个集群转换到另一个集群,这样消费者就可以切换到远程集群而不会丢失或复制消息。SRM 不断地将消费者组偏移量复制到远程集群,以便即使在源集群离线时也可以执行转换。
我们可以使用白名单/黑名单机制管理 SRM 为其复制偏移量的消费者组,类似于对主题所做的。
在本实验中,我们将为global_iot主题配置消费者组偏移复制,并执行消费者故障转移到远程集群。为了让它更有趣,我们将对两个消费者进行故障转移,一个使用正确的方法进行偏移转换,另一个不小心,这样我们就可以分析差异。
笔记 | 在整个实验的练习中,您将在 SSH 会话上运行 Kafka 消费者。请在同一个会话中运行所有的练习命令,这将使您更容易验证和比较故障转移和故障恢复后的结果。 |
---|
重要的 | 请注意,下面的命令是srm-control groups,与我们之前运行的srm-control topics不同。请注意实验示例中的确切命令,因为它们之间可能存在细微差别。 |
---|
export SECURESTOREPASS=supersecret1
export security_protocol=PLAINTEXT
sudo -E srm-control groups \
--source cluster_a \
--target cluster_b \
--add ".*"
sudo -E srm-control groups \
--source cluster_a \
--target cluster_b \
--list
Current whitelist:
.*
Current blacklist:
Done.
重要 | 请注意,白名单消费者组的偏移量复制仅针对正在复制的主题(根据主题白名单)。由于我们只将主题global_iot列入白名单,因此即使消费者从未列入白名单的其他主题中读取,也只会复制该主题的偏移量。 |
---|
这些设置允许连接到集群 A 的消费者成功地故障转移到集群 B。但是,在故障转移后的某个时间,我们可能希望让消费者故障恢复到集群 A。为此,我们必须继续复制消费者偏移量故障转移后从集群 B 到集群 A。
以下配置启用该复制:
export SECURESTOREPASS=supersecret1
export security_protocol=PLAINTEXT
sudo -E srm-control topics \
--source cluster_b \
--target cluster_a \
--add "cluster_a.global_iot"
sudo -E srm-control groups \
--source cluster_b \
--target cluster_a \
--add ".*"
sudo -E srm-control topics \
--source cluster_b \
--target cluster_a \
--list
sudo -E srm-control groups \
--source cluster_b \
--target cluster_a \
--list
# topics:
Current whitelist:
cluster_a.global_iot
Current blacklist:
Done.
# groups:
Current whitelist:
.*
Current blacklist:
Done.
CLUSTER_A_HOST=<CLUSTER_A_HOST_FQDN>
kafka-console-consumer \
--bootstrap-server $CLUSTER_A_HOST:9092 \
--whitelist ".*global_iot" \
--group good.failover | tee good.failover.before
重要 | 请注意,在上面的命令中,我们指定了 Kafka 客户端主题白名单,而不是显式提供主题名称。这使消费者可以更轻松地进行故障转移和恢复,而无需重新配置。由于 SRM 为复制的主题添加了前缀,因此白名单选项使我们能够提供一个正则表达式来匹配原始主题和复制的主题。不要将这个 Kafka 客户端主题白名单与我们之前讨论的 SRM 主题白名单混淆;它们用于不同的目的。 |
---|
CLUSTER_A_HOST=<CLUSTER_A_HOST_FQDN>
kafka-console-consumer \
--bootstrap-server $CLUSTER_A_HOST:9092 \
--whitelist ".*global_iot" \
--group bad.failover | tee bad.failover.before
])。请注意,我们使用的两个消费者组的偏移量现在被 SRM 复制:
CLUSTER_B_HOST=<CLUSTER_B_HOST_FQDN>
kafka-console-consumer \
--bootstrap-server $CLUSTER_B_HOST:9092 \
--whitelist ".*global_iot" \
--group bad.failover | tee bad.failover.after
为了检查故障转移是否正确发生,我们要计算故障转移前读取的最大时间戳与故障转移后读取的最小时间戳之间的差距。如果没有消息丢失,我们应该看到它们之间的间隔不超过 1 秒。+ 您可以手动验证这一点,也可以运行以下命令,这将为您计算差距:
last_msg_before_failover=$(grep -o "[0-9]\{10,\}" bad.failover.before | sort | tail -1)
first_msg_after_failover=$(grep -o "[0-9]\{10,\}" bad.failover.after | sort | head -1)
echo "Gap = $(echo "($first_msg_after_failover-$last_msg_before_failover)/1000000" | bc) second(s)"
Gap = -5 second(s)
笔记 | 在某些情况下,您可能会得到上面计算的差距的负值。对于额外的布朗尼积分:在哪些情况下可能会出现负值,这对消费者意味着什么? |
---|
export SECURESTOREPASS=supersecret1
export security_protocol=PLAINTEXT
sudo -E srm-control offsets \
--source cluster_a \
--target cluster_b \
--group good.failover \
--export > good.failover.offsets
cat good.failover.offsets
good.failover.offsets包含good.failover消费者组在源集群上触及的所有分区的所有转换偏移量。
CLUSTER_B_HOST=<CLUSTER_B_HOST_FQDN>
kafka-consumer-groups \
--bootstrap-server $CLUSTER_B_HOST:9092 \
--reset-offsets \
--group good.failover \
--from-file good.failover.offsets \
--execute
您应该会看到如下输出:
GROUP TOPIC PARTITION NEW-OFFSET
good.failover cluster_a.global_iot 3 831
good.failover cluster_a.global_iot 4 879
good.failover cluster_a.global_iot 0 883
good.failover cluster_a.global_iot 1 853
good.failover cluster_a.global_iot 2 892
CLUSTER_B_HOST=<CLUSTER_B_HOST_FQDN>
kafka-console-consumer \
--bootstrap-server $CLUSTER_B_HOST:9092 \
--whitelist ".*global_iot" \
--group good.failover | tee good.failover.after
last_msg_before_failover=$(grep -o "[0-9]\{10,\}" good.failover.before | sort | tail -1)
first_msg_after_failover=$(grep -o "[0-9]\{10,\}" good.failover.after | sort | head -1)
echo "Gap = $(echo "($first_msg_after_failover-$last_msg_before_failover)/1000000" | bc) second(s)"
Gap = 1 second(s)
笔记 | 消息之间的间隔不完全是1 秒。有时我们可以看到相邻消息之间有近 2 秒的间隔。 |
---|
export security_protocol=PLAINTEXT
sudo -E srm-control offsets \
--source cluster_b \
--target cluster_a \
--group good.failover \
--export > good.failback.offsets
cat good.failback.offsets
good.failback.offsets包含消费者组good.failover在集群 B 上运行时触及的所有分区的所有转换偏移量。
CLUSTER_A_HOST=<CLUSTER_A_HOST_FQDN>
kafka-consumer-groups \
--bootstrap-server $CLUSTER_A_HOST:9092 \
--reset-offsets \
--group good.failover \
--from-file good.failback.offsets \
--execute
您应该会看到如下输出:
GROUP TOPIC PARTITION NEW-OFFSET
good.failover global_iot 0 15525
good.failover global_iot 1 15656
good.failover global_iot 2 15587
good.failover global_iot 3 15534
good.failover global_iot 4 15623
CLUSTER_A_HOST=<CLUSTER_A_HOST_FQDN>
kafka-console-consumer \
--bootstrap-server $CLUSTER_A_HOST:9092 \
--whitelist ".*global_iot" \
--group good.failover | tee good.failover.after_failback
last_msg_before_failback=$(grep -o "[0-9]\{10,\}" good.failover.after | sort | tail -1)
first_msg_after_failback=$(grep -o "[0-9]\{10,\}" good.failover.after_failback | sort | head -1)
echo "Gap = $(echo "($first_msg_after_failover-$last_msg_before_failover)/1000000" | bc) second(s)"
Gap = 1 second(s)
笔记 | 消息之间的间隔不完全是1 秒。有时我们可以看到相邻消息之间有近 2 秒的间隔,这是正常的。 |
---|
在本实验中,我们配置了以下 SRM 白名单:
Direction | Type | Whitelist |
---|---|---|
A → B | Topics | global_iot |
Groups | .* | |
B → A | Topics | cluster_a.global_iot |
Groups | .* |
这些白名单实现了单方向(A → B)的数据复制,同时在两个方向(A ←→ B)复制消费者组偏移量。如果我们还想启用双向数据复制,我们必须修改白名单,如下所示:
Direction | Type | Whitelist |
---|---|---|
A → B | Topics | global_iotcluster_b.global_iot |
Groups | .* | |
B → A | Topics | global_iotcluster_a.global_iot |
Groups | .* |
这似乎过于复杂,可以通过使用正则表达式来简化。由于所有列入白名单的主题都以 结尾global_iot,我们可以将上面的白名单替换为以下集合,它是对称的且更易于维护:
Direction | Type | Whitelist |
---|---|---|
A → B | Topics | .*global_iot |
Groups | .* | |
B → A | Topics | .*global_iot |
Groups | .* |