PeerDB 团队最近完成了针对 Elasticsearch 的数据集成目标连接器的初步开发,并已进入测试阶段。 EElasticsearch 是一个广泛使用的搜索和分析引擎,它建立在分布式多用户能力的文档数据库之上。在多个行业的数据架构案例中都有 Elasticsearch 的广泛应用。
本文解释了一些通过 Postgres 到 Elasticsearch 的实时同步用例,然后通过一个快速演示展示了使用 PeerDB 进行 Postgres 到 Elasticsearch 复制的高性能和低延迟。最后,我们对连接器的架构进行了高级概述。
通过CDC或查询复制从Postgres到Elasticsearch复制的一些常见用例包括:
在这一部分,我将通过一个快速演示,介绍如何在变更数据捕获(CDC)模式下,使用 PeerDB 进行 Postgres 到 Elasticsearch 的复制。使用 PeerDB 从 Postgres 到 Elasticsearch 的复制有一些好处,主要的优点是快速的初始加载,和通过不断读取插槽来实现的亚分钟延迟,PeerDB 能够提供这些,因为它专注于 Postgres 的复制。
你可以在云上或者在本地使用任何Postgres数据库。为了简单起见,我在这个演示中使用了一个在 Docker 容器中本地运行的 Postgres 集群。我们创建了一个名为 oss1
的表,使用一个多值插入语句每秒连续插入1000行。
postgres=# CREATE TABLE oss1 (
id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
c1 INT,
c2 INT,
t TEXT,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT now()
);
CREATE TABLE
postgres=# INSERT INTO oss1 (c1, c2, t)
SELECT
generate_series AS c1,
generate_series * 2 AS c2,
'text_' || generate_series AS t
FROM
generate_series(1, 1000);
# 每秒运行一次INSERT
postgres=# \watch 1
INSERT 0 1000
INSERT 0 1000
INSERT 0 1000
你可以在本地或者云虚拟机上使用它的 Docker compose设置来设置一个 Elasticsearch 实例。或者你也可以使用腾讯云 ES 或者 Elasticsearch Cloud。在这个演示中,我使用了本地运行的 Docker compose 设置。
你可以使用 PeerDB开源版 或者 PeerDB云版 来部署一个PeerDB实例。在这个演示中,我通过Docker compose在本地部署了PeerDB开源版。
在 PeerDB 世界中,对等体指的是源数据存储或目标数据存储。你可以使用 PeerDB 的用户界面来创建Postgres和Elasticsearch对等体。然后在源对等体和目标对等体之间创建一个镜像进行数据复制。你可以使用 PeerDB 的用户界面来创建一个从 Postgres 到 Elasticsearch 复制数据的 MIRROR。
我创建了一个基于变更数据捕获(CDC)的 MIRROR,它使用 Postgres 的预写日志(WAL)和逻辑解码来复制数据。它包括两个步骤:
初始加载应该很快就能完成,而且应该能在创建的 Elasticsearch 索引中看到行。在进入连续的 CDC 模式后,新的行应该会随着它们被插入而显示出来。下面附上了一个显示 Postgres 到 Elasticsearch CDC 镜像的快速视频。
我们之前已经详细讨论过 PeerDB 的流式架构,但总的来说,PeerDB 利用 Go 的 goroutines 和通道,通过逻辑复制有效地从 PostgreSQL 读取数据,然后通过 Bulk API 批量将数据推送到 Elasticsearch。这种方法通过启用并行处理来提高执行时间。
我们的数据仓库连接器在将数据推送到最终表之前,先将数据存储在一个暂存表中,这是出于成本和性能的考虑。由于 Elasticsearch 的架构和查询语言,我们也能够避免这个中间步骤,直接将处理过的记录流发送到 Elasticsearch 索引,通过批量 API。
PeerDB 支持使用 Elasticsearch 作为 CDC 和查询复制的目标。在大多数情况下,我们推荐使用 CDC,因为它的使用更简单,可靠性更高,而且能够将 DELETE 复制到 Elasticsearch。然而,这限制了在加载到 Elasticsearch 之前可以进行的转换的范围。
为了在 Elasticsearch 侧支持去重,我们需要一个对每个文档保持一致的唯一 ID,这样我们就可以根据源更新或删除它。对于主键中只有一列的表,可以使用该列的值。对于主键中有多列的表,我们选择将列的值一起哈希,从而得到一个小的唯一标识符,无论行的宽度如何。
// 简化的 Go 代码
func primaryKeyColsHash(record []any, colIndices []int) string {
hasher := sha256.New()
for _, colIndex := range colIndices {
// 将值写入哈希器
_, _ = fmt.Fprint(hasher, record[colIndex])
}
hashBytes := hasher.Sum(nil)
return base64.RawURLEncoding.EncodeToString(hashBytes)
}
# PeerDB上传到Elasticsearch的样本文档。
# 注意 _id 字段是主键列 id 和 c1 的(base64 编码的)哈希值。
{
"_index": "public.oss2",
"_id": "SAgdSqEaQyGYWxOo8Dj2s0DbXsQXLTC_CWlds8-c4kY",
"_version": 1,
"_seq_no": 0,
"_primary_term": 1,
"found": true,
"_source": {
"c1": 434017,
"c2": 922856,
"id": 8,
"t": "pgbenchinsertc4b998821cc6b161e65489b3",
"updated_at": "2024-05-08T18:33:39.031107Z"
}
}
查询复制可以以追加模式进行,其中任何变化都会在 Elasticsearch 中创建一个新文档,或者以 upsert 模式进行,其中一些列被指定为键列,这些列在类似于 CDC 的方式中进行去重。
默认情况下,PeerDB 目前使用 Elasticsearch 的动态映射来自动根据索引中的文档内容推断出数据类型映射。在实践中,数字类型被映射为 long
或 float
,时间戳类型被映射为 date
,大多数其他类型被映射为 text
。更详细的映射可以在这里找到。这对许多用例都有效。如果需要,用户可以在手动创建索引时提供显式映射,PeerDB 将向此索引加载文档。
Elasticsearch 连接器处于测试阶段 -- 我们已经有客户使用 PeerDB 将数十亿行从 Postgres 移动到 Elasticsearch。如果你是一个 Elasticsearch 用户,并希望使用 PeerDB 将数据从 Postgres 复制到 Elasticsearch,请试试 PeerDB!我们很乐意帮助你或者得到你的反馈:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。