前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Edge2AI之流复制

Edge2AI之流复制

作者头像
大数据杂货铺
发布2022-04-27 17:01:58
7900
发布2022-04-27 17:01:58
举报
文章被收录于专栏:大数据杂货铺

笔记

本实验假设从边缘到流处理已完成。如果您还没有,请让您的讲师为您设置集群状态。

在本次实验中,您将使用 Streams Replication Manager (SRM) 跨集群复制 Kafka 主题。

本实验中的实验将需要两个集群,以便我们可以在它们之间配置复制。如果您的讲师为您分配了两个集群,您可以自己执行所有配置。否则,请与另一个参加过的实验配对,并共同配置您的各个集群之间的复制。

我们将在整个实验中将这两个集群称为集群 A集群 B ,并将在整个练习中分别使用别名cluster_a和cluster_b,可以更改这些别名以满足您的需要/偏好(例如,您可以使用nycand和paris,只要您在所有练习中保持一致性)。

虽然一个 SRM 集群可以双向复制,但我们将为这种类型的复制实施最佳实践,其中包括远程读取和本地写入。因此,集群 A 中的 SRM 会将消息从集群 B 复制到集群 A,反之亦然。

一些实验必须在两个集群上执行,而其他实验只适用于一个特定的集群。在每个实验开始时,我们将指定它们适用于哪些集群。

概述

在这一系列实验中,我们将安装和配置 Streams Replication Manager (SRM) 服务,以在两个 Kafka 集群之间复制数据和配置。

Streams Replication Manager 由三个角色组成:

  • Streams Replication Manager Driver 角色:该角色负责连接到指定的集群并在它们之间执行复制。该驱动程序可以安装在一台或多台主机上。
  • Streams Replication Manager 服务角色:该角色由 REST API 和 Kafka Streams 应用程序组成,用于聚合和公开集群、主题和消费者组指标。该服务可以安装在或更多的主机上。
  • Streams Replication Manager 网关角色:此角色可用于在没有运行驱动程序或服务角色的主机上部署 SRM 客户端配置。

我们还将了解如何配置 Streams Messaging Manager (SMM) 服务来监控在两个集群之间配置的复制。

我们的目标是在实验结束时实现以下双向复制架构:

实验总结

  • 实验1 – 配置Kafka外部账户
  • 实验 2 - 安装 Streams Replication Manager (SRM) 服务
  • 实验 3 - 调整 SRM 服务
  • 实验 4 - 配置复制监控
  • 实验 5 - 使用 SRM 启用 Kafka 复制
  • 实验 6 - 故障转移消费者

实验1:配置Kafka外部账户

笔记

在两个集群 上运行

Streams Replication Manager (SRM) 可以安装在由 Cloudera Manager 管理的现有集群上。为此,您需要使用 Kafka 凭据定义外部集群,将 SRM 添加到集群,并配置与集群、复制和角色目标相关的许多必需属性。

  1. 在 Cloudera Manager 中,转到 管理>>外部帐户。
  2. 转到Kafka 凭据选项卡。

Kafka Credentials选项卡使您能够创建、配置和管理 Kafka 凭证。Kafka 凭证是包含 SRM 与集群建立连接所需的连接属性的项目。您可以将 Kafka 凭证视为单个集群的定义。在此选项卡上,您将为参与复制过程的每个外部集群创建一个凭证。

  1. 为两个集群的外部集群创建和配置 Kafka 凭证:

外部集群的安全配置决定了您必须设置哪些可用属性。至少必须设置以下属性:

  • Name
  • Bootstrap servers
  • Security protocol

Cluster A配置:

代码语言:javascript
复制
Name=cluster_b
Bootstrap servers=10.0.211.178:9092
Security protocol=PLAINTEXT

Cluster B配置:

代码语言:javascript
复制
Name=cluster_a
Bootstrap servers=10.0.211.167:9092
Security protocol=PLAINTEXT
  1. 点击添加,如果凭证创建成功,页面上会出现与您指定的 Kafka 凭证对应的新条目。

实验 2 - 安装Streams Replication Manager (SRM)服务

笔记

在两个集群 上运行

  1. 在 Cloudera Manager 控制台上,单击左上角的 Cloudera 徽标以确保您位于主页上。
  2. 单击OneNodeCluster名称右侧的“三点”菜单,然后选择添加服务
  1. 选择Streams Replication Manager并单击继续
  2. Select Dependencies页面上,选择包含HDFS、Kafka 和 ZooKeeper的行,然后单击Continue
  1. 分配角色页面上,将角色都分配到该节点上并单击继续
  1. Review Changes页面上设置以下属性:

笔记

将以下值中的CLUSTER_A_FQDN和CLUSTER_B_FQDN占位符分别替换为集群 A 和 B 的完全限定域名。您可以在实验登录页面中为您的集群使用完全限定的主机名。

  • 仅在集群 A上:

Property

value

External Kafka Accounts

cluster_b

Streams Replication Manager Cluster alias

cluster_a, cluster_b

Streams Replication Manager’s Replication Configs(click the “+” button to separately add each property on the right)

cluster_b->cluster_a.enabled=true

replication.factor=1

heartbeats.topic.replication.factor=1

checkpoints.topic.replication.factor=1

offset-syncs.topic.replication.factor=1

offset.storage.replication.factor=1

config.storage.replication.factor=1

status.storage.replication.factor=1

metrics.topic.replication.factor=1

Streams Replication Manager Driver Target Cluster

cluster_b,cluster_a

Streams Replication Manager Service Target Cluster

cluster_a

Streams Replication Manager Co-located Kafka Cluster Alias

cluster_a

  • 仅在集群 B上:

Property

Value

External Kafka Accounts

cluster_a

Streams Replication Manager Cluster alias

cluster_a, cluster_b

Streams Replication Manager’s Replication Configs(click the “+” button to separately add each property on the right)

cluster_a->cluster_b.enabled=true

replication.factor=1

heartbeats.topic.replication.factor=1

checkpoints.topic.replication.factor=1

offset-syncs.topic.replication.factor=1

offset.storage.replication.factor=1

config.storage.replication.factor=1

status.storage.replication.factor=1

metrics.topic.replication.factor=1

Streams Replication Manager Driver Target Cluster

cluster_b,cluster_a

Streams Replication Manager Service Target Cluster

cluster_b

Streams Replication Manager Co-located Kafka Cluster Alias

cluster_b

  1. 正确设置所有属性后单击继续
  2. 等待第一个运行命令完成,然后单击继续
  3. 点击完成

您现在拥有一个工作的 Streams Replication Manager 服务!

实验 3 - 调整Streams Replication Manager (SRM)服务

笔记

在两个集群 上运行

SRM 服务配置了一些通常适用于生产环境的默认刷新间隔。但是,对于我们的实验,我们希望刷新间隔更短,以便我们可以运行测试并快速查看结果。在继续之前,让我们重新配置这些间隔。

  1. 在 Cloudera Manager 控制台上,转到Clusters > Streams Replication Manager > Configuration
  2. 在搜索框中搜索“password”,设置SRM Client's Secure Storage Password的属性值为supersecret1
  1. 在搜索框中,键入“ interval ”以过滤配置属性
  2. 设置以下属性:

Property

Value

Refresh Topics Interval Seconds

30 seconds

Refresh Groups Interval Seconds

30 seconds

Sync Topic Configs Interval Seconds

30 seconds

  1. 点击保存更改
  2. 单击操作 > 部署客户端配置并等待客户端配置部署完成。
  3. 单击操作 > 重新启动并等待服务重新启动完成。

实验 4 - 配置复制监控

笔记

在两个集群 上运行

在本实验中,我们将配置 Streams Messaging Manager (SMM) 来监控两个集群之间的 Kafka 复制。

  1. 在 Cloudera Manager 控制台上,转到Clusters > SMM > Configuration
  2. 在搜索框中,键入“replica”以过滤配置属性
  3. 为服务设置以下属性:

Property

value

Configure Streams Replication Manager

Checked

Streams Replication Manager Rest Protocol

http

  1. 点击保存更改
  2. 单击操作 > 重新启动并等待服务重新启动完成。
  3. 转到 SMM Web UI(Clusters > SMM > Streams Messaging Manager Web UI),然后单击Cluster Replications图标 (

)。您应该能够看到两个集群上复制的监控页面:

在集群 A 上:

在集群 B 上:

请注意,到目前为止,只有heartbeats主题被复制。在下一个实验中,我们将为复制添加更多主题。

提示

如果复制在任何时间点显示为INACTIVE,请等待几秒钟并刷新屏幕。

实验 5 - 使用 Streams Replication Manager (SRM) 启用 Kafka 复制

笔记

在步骤说明中指示的集群上运行

在本实验中,我们将启用主动-主动复制,将集群 A 中产生的消息复制到集群 B,将集群 B 中产生的消息复制到集群 A。

SRM 有一个主题的白名单和黑名单。仅复制在白名单中但不在黑名单中的主题。管理员可以有选择地控制要复制的主题,但可以管理这些列表。这同样适用于消费者组偏移复制。

  1. 集群 A:为了准备本实验中的活动,我们首先使用 SMM 创建一个新的 Kafka 主题。在 SMM Web UI 上,单击左侧菜单上的主题图标,然后单击添加新按钮,并添加以下属性:
代码语言:javascript
复制
Topic Name:     global_iot
Partitions:     5
Availability:   LOW (*)
Cleanup Policy: delete

(*) 上面的 LOW 可用性设置是必需的,因为我们的集群只有一个节点。

  1. 集群 A:点击保存创建主题

现在,按照以下步骤启用从集群A 到 B 的消息复制。这些步骤应仅在集群 B 中执行。

  1. 集群 B:要启动主题的复制,我们必须在 SRM 中将它们列入白名单。SRM 支持将具有特定模式的主题列入白名单/黑名单的正则表达式。在我们的例子中,我们只想复制以关键字开头的主题global。为此,通过 SSH 连接到集群 B主机并运行以下命令:
代码语言:javascript
复制
export SECURESTOREPASS=supersecret1
export security_protocol=PLAINTEXT
sudo -E srm-control topics \
--source cluster_a \
--target cluster_b \
--add "global_iot"

其中:supersecret1为实验3中配置的密码。

运行以下命令确认白名单修改正确:

代码语言:javascript
复制
sudo -E srm-control topics \
--source cluster_a \
--target cluster_b \
--list

您应该看到以下输出:

代码语言:javascript
复制
Current whitelist:
global_iot
Current blacklist:
Done.

提示

白名单也可以指定为正则表达式,这在您需要通过模式匹配选择主题时很有帮助。

  1. Cluster B:转到 SMM Web UI 并检查 Cluster Replications 页面。您现在应该看到与白名单匹配的所有主题都出现在复制中:
  1. 集群 B:单击主题图标并搜索包含该字符串的所有主题iot。您应该会看到一个名为cluster_a.global_iot 的新主题。由于我们还没有为源主题生成任何数据,因此复制的主题也是空的。
  1. 集群 A:为了检查复制是否正常工作,我们需要开始为集群A中的Kafka 主题global_iot生成数据。最简单的方法是对我们现有的 NiFi 流程进行一些小改动:
  2. 转到集群 A上的 NiFi 画布
  3. 进入Process Sensor Data 处理组
  4. 选择PublishKafkaRecord处理器并复制并粘贴它。这将在画布上创建一个新的处理器。
  5. 双击新处理器打开配置
  6. SETTINGS选项卡上,将Name属性更改为“Publish to Kafka topic: global_iot
  7. 仍在设置选项卡上,检查自动终止关系部分中的成功属性
  8. 属性选项卡上,将主题名称属性更改为global_iot。

重要提示:确保主题名称中没有前导或尾随空格。

  1. 单击应用
  2. 将“Set Schema Name”处理器连接到新的 Kafka 处理器。
  3. 将新的 Kafka 处理器连接到与原始处理器连接的同一“failure”漏斗。
  4. 当连接对话框打开时,检查 failure关系并单击ADD
  1. 启动新处理器。
  2. 现在,您将对事件iot和global_iot主题进行双重摄取。您的流程现在应该如下所示:
  1. 集群 B:转到 SMM Web UI 并检查cluster_a.global_iot主题的内容。您应该看到从集群 A 复制的事件。一段时间后,您将看到复制主题的指标增加。

即使统计数据尚未更新,请尝试单击放大镜图标查看数据。即使统计数据尚未刷新,这通常也会立即显示出来。

  1. 集群 B:单击集群复制图标 (

) 并检查吞吐量和延迟指标,以确保一切都按预期工作。您应该期望吞吐量大于零和毫秒级的延迟。

  1. 现在复制在A → B方向进行,反向重复相同的步骤以实现B → A方向的复制。

重要

请确保已在两个方向上设置了 SRM 复制,然后再继续进行下一个实验。在故障转移实验中,我们将练习消费者故障转移和故障恢复,这需要双向复制才能正常工作。

实验 6 - 故障转移消费者

笔记

在步骤说明中指示的集群上运行

SRM 的一大特色是它能够将消费者组偏移量从一个集群转换到另一个集群,这样消费者就可以切换到远程集群而不会丢失或复制消息。SRM 不断地将消费者组偏移量复制到远程集群,以便即使在源集群离线时也可以执行转换。

我们可以使用白名单/黑名单机制管理 SRM 为其复制偏移量的消费者组,类似于对主题所做的。

在本实验中,我们将为global_iot主题配置消费者组偏移复制,并执行消费者故障转移到远程集群。为了让它更有趣,我们将对两个消费者进行故障转移,一个使用正确的方法进行偏移转换,另一个不小心,这样我们就可以分析差异。

笔记

在整个实验的练习中,您将在 SSH 会话上运行 Kafka 消费者。请在同一个会话中运行所有的练习命令,这将使您更容易验证和比较故障转移和故障恢复后的结果。

  1. 集群 B:为了简化本实验的目的,让我们通过将正则表达式.*添加到白名单中,将A → B的所有消费者组的复制列入白名单。为此,通过 SSH 连接到集群 B主机并运行以下命令:

重要的

请注意,下面的命令是srm-control groups,与我们之前运行的srm-control topics不同。请注意实验示例中的确切命令,因为它们之间可能存在细微差别。

代码语言:javascript
复制
export SECURESTOREPASS=supersecret1
export security_protocol=PLAINTEXT
sudo -E srm-control groups \
--source cluster_a \
--target cluster_b \
--add ".*"
  1. 运行以下命令确认白名单修改正确:
代码语言:javascript
复制
sudo -E srm-control groups \
--source cluster_a \
--target cluster_b \
--list
  1. 您应该看到以下输出:
代码语言:javascript
复制
Current whitelist:
.*
Current blacklist:
Done.

重要

请注意,白名单消费者组的偏移量复制仅针对正在复制的主题(根据主题白名单)。由于我们只将主题global_iot列入白名单,因此即使消费者从未列入白名单的其他主题中读取,也只会复制该主题的偏移量。

  1. 到目前为止,我们已经:
    1. 通过将SRM中的global_iot主题列入白名单,配置从集群 A → B 的数据复制;
    2. 如前所述,通过将 SRM 中的所有消费者组以及主题白名单列入白名单,配置从集群 A → B 复制消费者组偏移量。

这些设置允许连接到集群 A 的消费者成功地故障转移到集群 B。但是,在故障转移后的某个时间,我们可能希望让消费者故障恢复到集群 A。为此,我们必须继续复制消费者偏移量故障转移后从集群 B 到集群 A。

以下配置启用该复制:

    1. 集群 A:连接到集群 A并运行以下命令:
代码语言:javascript
复制
export SECURESTOREPASS=supersecret1
export security_protocol=PLAINTEXT
sudo -E srm-control topics \
--source cluster_b \
--target cluster_a \
--add "cluster_a.global_iot"


sudo -E srm-control groups \
--source cluster_b \
--target cluster_a \
--add ".*"
    1. 集群 A:运行以下命令确认白名单已正确修改:
代码语言:javascript
复制
sudo -E srm-control topics \
--source cluster_b \
--target cluster_a \
--list


sudo -E srm-control groups \
--source cluster_b \
--target cluster_a \
--list
    1. 集群 A:您应该看到以下输出:
代码语言:javascript
复制
# topics:
Current whitelist:
cluster_a.global_iot
Current blacklist:
Done.
代码语言:javascript
复制
# groups:
Current whitelist:
.*
Current blacklist:
Done.
  1. 打开与任何主机的 SSH 会话并运行以下使用者以开始使用集群 A上global_iot主题的数据。此消费者使用名为good.failover的消费者组:
代码语言:javascript
复制
CLUSTER_A_HOST=<CLUSTER_A_HOST_FQDN>
kafka-console-consumer \
--bootstrap-server $CLUSTER_A_HOST:9092 \
--whitelist ".*global_iot" \
--group good.failover | tee good.failover.before

重要

请注意,在上面的命令中,我们指定了 Kafka 客户端主题白名单,而不是显式提供主题名称。这使消费者可以更轻松地进行故障转移和恢复,而无需重新配置。由于 SRM 为复制的主题添加了前缀,因此白名单选项使我们能够提供一个正则表达式来匹配原始主题和复制的主题。不要将这个 Kafka 客户端主题白名单与我们之前讨论的 SRM 主题白名单混淆;它们用于不同的目的。

  1. 让消费者从主题中读取一些数据,然后在屏幕上显示几行数据后按 CTRL+C。上面的命令将检索到的消息保存在good.failover.before文件中。
  2. 运行这个其他消费者也可以使用集群 A上的global_iot主题中的一些数据。此消费者使用与第一个不同的消费者组,称为:bad.failover
代码语言:javascript
复制
CLUSTER_A_HOST=<CLUSTER_A_HOST_FQDN>
kafka-console-consumer \
--bootstrap-server $CLUSTER_A_HOST:9092 \
--whitelist ".*global_iot" \
--group bad.failover | tee bad.failover.before
  1. 同样,让消费者从主题中读取一些数据,并在屏幕上显示几行数据后按 CTRL+C。上面的这个命令将检索到的消息保存在bad.failover.before文件中。
  2. Cluster B:打开 SMM Web UI,然后单击Cluster Replications图标 (

])。请注意,我们使用的两个消费者组的偏移量现在被 SRM 复制:

  1. 现在让我们首先尝试在不遵循偏移转换的推荐步骤的情况下对消费者进行故障转移。在之前运行bad.failover消费者的同一 SSH 会话中,再次运行消费者。不过,这一次,我们将连接到集群 B上的复制cluster_a.global_iot主题。
代码语言:javascript
复制
CLUSTER_B_HOST=<CLUSTER_B_HOST_FQDN>
kafka-console-consumer \
--bootstrap-server $CLUSTER_B_HOST:9092 \
--whitelist ".*global_iot" \
--group bad.failover | tee bad.failover.after
  1. 正如您之前所做的那样,让消费者从主题中读取一些数据,并在屏幕上显示几行数据后按 CTRL+C。上面的这个命令将检索到的消息保存在bad.failover.after文件中。
  2. bad.failover.before上面和文件中保存的每条消息bad.failover.after都有生成时间的时间戳。由于我们每秒大约生成 1 条消息,因此我们希望确保两个连续消息之间的间隔不会远大于 1 秒。

为了检查故障转移是否正确发生,我们要计算故障转移前读取的最大时间戳与故障转移后读取的最小时间戳之间的差距。如果没有消息丢失,我们应该看到它们之间的间隔不超过 1 秒。+ 您可以手动验证这一点,也可以运行以下命令,这将为您计算差距:

代码语言:javascript
复制
last_msg_before_failover=$(grep -o "[0-9]\{10,\}" bad.failover.before | sort | tail -1)
first_msg_after_failover=$(grep -o "[0-9]\{10,\}" bad.failover.after | sort | head -1)
echo "Gap = $(echo "($first_msg_after_failover-$last_msg_before_failover)/1000000" | bc) second(s)"
  1. 您应该会看到如下所示的输出,显示故障转移前后的消息之间存在很大差距。间隔的长度取决于您在两次执行bad.failover消费者之间花费了多长时间。
代码语言:javascript
复制
Gap = -5 second(s)

笔记

在某些情况下,您可能会得到上面计算的差距的负值。对于额外的布朗尼积分:在哪些情况下可能会出现负值,这对消费者意味着什么?

  1. 现在我们已经看到了错误的故障转移是什么样子,让我们正确地对另一个消费者进行故障转移。连接Cluster B主机,执行如下命令,导出consumer group的good.failover翻译后的offset 。请注意,即使集群 A 不可用,您也可以执行此命令。
代码语言:javascript
复制
export SECURESTOREPASS=supersecret1
export security_protocol=PLAINTEXT


sudo -E srm-control offsets \
--source cluster_a \
--target cluster_b \
--group good.failover \
--export > good.failover.offsets


cat good.failover.offsets

good.failover.offsets包含good.failover消费者组在源集群上触及的所有分区的所有转换偏移量。

  1. 要完成偏移量转换,仍然在集群 B主机上,运行以下命令将转换后的偏移量导入 Kafka:
代码语言:javascript
复制
CLUSTER_B_HOST=<CLUSTER_B_HOST_FQDN>
kafka-consumer-groups \
--bootstrap-server $CLUSTER_B_HOST:9092 \
--reset-offsets \
--group good.failover \
--from-file good.failover.offsets \
--execute

您应该会看到如下输出:

代码语言:javascript
复制
GROUP                          TOPIC                          PARTITION  NEW-OFFSET
good.failover                  cluster_a.global_iot           3          831
good.failover                  cluster_a.global_iot           4          879
good.failover                  cluster_a.global_iot           0          883
good.failover                  cluster_a.global_iot           1          853
good.failover                  cluster_a.global_iot           2          892
  1. 我们现在已准备好对good.failover消费者组进行故障转移。在 SSH 会话上,运行good.failover消费者,连接到集群 B上的复制cluster_a.global_iot主题。
代码语言:javascript
复制
CLUSTER_B_HOST=<CLUSTER_B_HOST_FQDN>
kafka-console-consumer \
--bootstrap-server $CLUSTER_B_HOST:9092 \
--whitelist ".*global_iot" \
--group good.failover | tee good.failover.after
  1. 这一次你会注意到当你启动消费者时会同时读取很多消息。发生这种情况是因为消费者之前停止的偏移量被转换到新集群并加载到 Kafka 中。因此,消费者开始阅读从那之后它停止并积累的所有消息。
  1. 按 CTRL+C 停止使用者。上面的命令将检索到的消息保存在good.failover.after文件中。
  2. 让我们在正确的故障转移期间检查消息之间的间隙。同样,您可以手动执行或运行以下命令:
代码语言:javascript
复制
last_msg_before_failover=$(grep -o "[0-9]\{10,\}" good.failover.before | sort | tail -1)
first_msg_after_failover=$(grep -o "[0-9]\{10,\}" good.failover.after | sort | head -1)
echo "Gap = $(echo "($first_msg_after_failover-$last_msg_before_failover)/1000000" | bc) second(s)"
  1. 您应该看到现在的间隔为 1 秒,这意味着在故障转移期间没有任何消息被跳过或丢失:

Gap = 1 second(s)

笔记

消息之间的间隔不完全是1 秒。有时我们可以看到相邻消息之间有近 2 秒的间隔。

  1. 消费者故障回复的工作方式相同。在我们让消费者失败之前,我们需要将偏移量反向转换(从集群 B 到集群 A)。在集群 A上运行以下命令以执行故障回复偏移量转换:
代码语言:javascript
复制
export security_protocol=PLAINTEXT


sudo -E srm-control offsets \
--source cluster_b \
--target cluster_a \
--group good.failover \
--export > good.failback.offsets


cat good.failback.offsets

good.failback.offsets包含消费者组good.failover在集群 B 上运行时触及的所有分区的所有转换偏移量。

  1. 要完成偏移量转换,仍然在集群 A主机上,运行以下命令将转换后的偏移量导入 Kafka:
代码语言:javascript
复制
CLUSTER_A_HOST=<CLUSTER_A_HOST_FQDN>
kafka-consumer-groups \
--bootstrap-server $CLUSTER_A_HOST:9092 \
--reset-offsets \
--group good.failover \
--from-file good.failback.offsets \
--execute

您应该会看到如下输出:

代码语言:javascript
复制
GROUP                          TOPIC                          PARTITION  NEW-OFFSET
good.failover                  global_iot                     0          15525
good.failover                  global_iot                     1          15656
good.failover                  global_iot                     2          15587
good.failover                  global_iot                     3          15534
good.failover                  global_iot                     4          15623
  1. 我们现在准备好对消费者组good.failover进行故障回复。在 SSH 会话上,运行good.failover消费者,再次连接到集群 A上的原始global_iot主题。
代码语言:javascript
复制
CLUSTER_A_HOST=<CLUSTER_A_HOST_FQDN>
kafka-console-consumer \
--bootstrap-server $CLUSTER_A_HOST:9092 \
--whitelist ".*global_iot" \
--group good.failover | tee good.failover.after_failback
  1. 按 CTRL+C 停止使用者。上面的命令将检索到的消息保存在good.failover.after_failback文件中。
  2. 让我们在正确的故障转移期间检查消息之间的间隙。同样,您可以手动执行或运行以下命令:
代码语言:javascript
复制
last_msg_before_failback=$(grep -o "[0-9]\{10,\}" good.failover.after | sort | tail -1)
first_msg_after_failback=$(grep -o "[0-9]\{10,\}" good.failover.after_failback | sort | head -1)
echo "Gap = $(echo "($first_msg_after_failover-$last_msg_before_failover)/1000000" | bc) second(s)"
  1. 您应该看到现在间隔为 1 秒,这意味着在故障回复期间没有任何消息被跳过或丢失:

Gap = 1 second(s)

笔记

消息之间的间隔不完全是1 秒。有时我们可以看到相邻消息之间有近 2 秒的间隔,这是正常的。

简化白名单

在本实验中,我们配置了以下 SRM 白名单:

Direction

Type

Whitelist

A → B

Topics

global_iot

Groups

.*

B → A

Topics

cluster_a.global_iot

Groups

.*

这些白名单实现了单方向(A → B)的数据复制,同时在两个方向(A ←→ B)复制消费者组偏移量。如果我们还想启用双向数据复制,我们必须修改白名单,如下所示:

Direction

Type

Whitelist

A → B

Topics

global_iotcluster_b.global_iot

Groups

.*

B → A

Topics

global_iotcluster_a.global_iot

Groups

.*

这似乎过于复杂,可以通过使用正则表达式来简化。由于所有列入白名单的主题都以 结尾global_iot,我们可以将上面的白名单替换为以下集合,它是对称的且更易于维护:

Direction

Type

Whitelist

A → B

Topics

.*global_iot

Groups

.*

B → A

Topics

.*global_iot

Groups

.*

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-04-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据杂货铺 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • 实验总结
  • 实验1:配置Kafka外部账户
  • 实验 2 - 安装Streams Replication Manager (SRM)服务
  • 实验 3 - 调整Streams Replication Manager (SRM)服务
  • 实验 4 - 配置复制监控
  • 实验 5 - 使用 Streams Replication Manager (SRM) 启用 Kafka 复制
  • 实验 6 - 故障转移消费者
    • 简化白名单
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档