在过去三年中,由于用户和内容的增长,Notion 的数据增长了 10 倍,以 6-12 个月的速度翻了一番。要管理这种快速增长,同时满足关键产品和分析用例不断增长的数据需求,尤其是我们最近的 Notion AI 功能,意味着构建和扩展 Notion 的数据湖。以下来介绍我们是如何做到的。
在 Notion 中看到的所有内容(文本、图像、标题、列表、数据库行、页面等)尽管前端表示和行为不同,但在后端被建模为“块”实体,并存储在具有一致结构、架构和相关元数据的 Postgres 数据库中(了解有关 Notion 数据模型的更多信息)。
在用户活动和内容创作的推动下,所有这些区块数据每 6 到 12 个月翻一番。在 2021 年初,我们在 Postgres 中有超过 200 亿个区块行,此后这个数字已经增长到超过 2000 亿个区块——即使压缩后的数据量也高达数百 TB。为了在增强用户体验的同时管理这种数据增长,我们战略性地将数据库基础设施从一个 Postgres 实例扩展到更复杂的分片架构。我们从 2021 年开始将 Postgres 数据库水平分片为 32 个物理实例,每个实例包含 15 个逻辑分片,并在 2023 年继续将物理实例数量增加到 96 个,每个实例有 5 个逻辑分片。因此,我们总共维护了 480 个逻辑分片,同时确保了长期可扩展的数据管理和检索能力。
到 2021 年,Postgres 构成了我们生产基础设施的核心,处理从在线用户流量到各种离线数据分析和机器学习需求的所有内容。随着对线上和线下数据需求的增加,我们意识到构建一个专用的数据基础设施来处理离线数据而不干扰在线流量至关重要。
2021 年,我们通过一个简单的 ELT(提取、加载和转换)管道启动了这个专用数据基础设施,该管道使用第三方工具 Fivetran 将数据从 Postgres WAL(预写日志)摄取到 Snowflake,并为 480 个分片设置了 480 个每小时运行的连接器,以写入相同数量的原始 Snowflake 表。然后我们将这些表合并为一个大表,用于分析、报告和机器学习用例。
随着 Postgres 数据的增长,我们遇到了一些扩展挑战。
速度、数据新鲜度和成本
将数据摄取到 Snowflake 的速度变慢且成本更高,这主要是由于 Notion 独特的更新繁重工作负载。Notion 用户更新现有块(文本、标题、标题、项目符号列表、数据库行等)的频率远远高于添加新块的频率。这导致块数据主要是更新量大的 ~90% 的 Notion 更新插入是更新。大多数数据仓库(包括 Snowflake)都针对插入繁重的工作负载进行了优化,这使得它们摄取块数据变得越来越具有挑战性。
用例支持
数据转换逻辑变得更加复杂和繁重,超过了现成数据仓库提供的标准 SQL 接口的功能。
block_1
, block_2
, 并 block_3
继承其直接父级 ( page_3
和 page_2
) 和祖先 ( page_1 ``workspace_a).
和 和 要为每个块构建权限数据,我们必须遍历其祖先树一直到根 ( workspace_a
),以确保完整性。由于有数千亿个区块,其祖先深度从几个到几十个不等,这种计算成本非常高,而且只会在 Snowflake 中超时。由于这些挑战,我们开始探索构建我们的数据湖。
以下是我们构建内部数据湖的目标:
但是,虽然我们的数据湖是向前迈出的一大步,但重要的是要澄清它不打算做什么:
自 2022 年以来,我们一直使用如下所示的内部数据湖架构。我们使用 Debezium CDC 连接器将增量更新的数据从 Postgres 摄取到 Kafka,然后使用 Apache Hudi(一个开源数据处理和存储框架)将这些更新从 Kafka 写入 S3。然后利用这些原始数据,我们可以进行转换、非规范化(例如,每个块的树遍历和权限数据构建)和扩充,然后将处理后的数据再次存储在 S3 中或下游系统中,以满足分析和报告需求,以及 AI、搜索和其他产品要求。
接下来,我们将描述和说明我们在广泛的研究、讨论和原型设计工作后得出的设计原则和决策。
我们的第一个决定是将 S3 用作数据存储库和湖来存储所有原始和处理过的数据,并将数据仓库和其他面向产品的数据存储(如 ElasticSearch、Vector Database、Key-Value Store 等)定位为其下游。我们做出这个决定有两个原因:
通过将繁重的摄取和计算工作负载卸载到 S3,并仅将高度清理的业务关键型数据摄取到 Snowflake 和面向产品的数据存储,我们显著提高了数据计算的可扩展性和速度,并降低了成本。
我们选择Spark作为我们的主要数据处理引擎,因为作为一个开源框架,它可以快速设置和评估,以验证它是否满足我们的数据转换需求。Spark 具有四个主要优势:
在完成我们的数据湖存储和处理引擎后,我们探索了将 Postgres 数据摄取到 S3 的解决方案。我们最终考虑了两种方法:增量摄取更改的数据和 Postgres 表的定期完整快照。最后,基于性能和成本的比较,我们选择了混合设计:
增量方法可确保以更低的成本和最小的延迟(几分钟到几个小时,具体取决于表大小)获得更新鲜的数据。相比之下,导出完整快照并转储到 S3 需要 10 多个小时,成本是 S3 的两倍,因此在 S3 中引导新表时,我们很少这样做。
我们选择了 Kafka Debezium CDC(更改数据捕获)连接器将增量更改的 Postgres 数据发布到 Kafka,类似于 Fivetran 的数据摄取方法。我们之所以选择它与 Kafka 一起,是因为它们具有可扩展性、易于设置以及与我们现有基础架构的紧密集成。
为了将增量数据从 Kafka 引入到 S3,我们考虑了三种出色的数据湖解决方案:Apache Hudi、Apache Iceberg 和 Databricks Delta Lake。最后我们选择了 Hudi,因为它具有出色的性能,可以处理大量更新的工作负载,并且具有开源特性以及与 Debezium CDC 消息的原生集成。另一方面,当我们在 2022 年考虑 Iceberg 和 Delta Lake 时,它们并没有针对我们的更新繁重工作负载进行优化。Iceberg 还缺乏一个能够理解 Debezium 消息的开箱即用的解决方案;Delta Lake 有一个但并不开源。如果我们采用这两种解决方案中的任何一个,我们将不得不实现我们自己的 Debezium 消费者。
最后,我们决定将原始 Postgres 数据摄取到 S3,而无需进行动态处理,以便建立单一事实来源并简化整个数据管道的调试。一旦原始数据进入 S3,我们就会进行转换、非规范化、扩充和其他类型的数据处理。我们再次将中间数据存储在 S3 中,并且仅将高度清理、结构化和关键业务数据引入下游系统,以满足分析、报告和产品需求。
我们尝试了许多详细的设置,以解决与 Notion 不断增长的数据量相关的可扩展性挑战。以下是我们尝试的内容和进展情况:
CDC 连接器和 Kafka 设置
我们在每个 Postgres 主机上设置一个 Debezium CDC 连接器,并将它们部署在 AWS EKS 集群中。由于 Debezium 和 EKS 管理的成熟度以及 Kafka 的可扩展性,我们在过去两年中只需要升级几次 EKS 和 Kafka 集群。截至 2024 年 5 月,它可以顺利处理数十 MB/秒的 Postgres 行变更。我们还为每个 Postgres 表配置一个 Kafka 主题,并让所有消耗 480 个分片的连接器写入该表的同一主题。此设置显著降低了为每个表维护 480 个主题的复杂性,并简化了下游 Hudi 对 S3 的摄取,从而显著降低了运营开销。
Hudi设置
我们使用 Apache Hudi Deltastreamer(一个基于 Spark 的摄取作业)来使用 Kafka 消息并在 S3 中复制 Postgres 表的状态。经过几轮性能优化后,我们建立了一个快速、可扩展的摄取设置,以确保数据新鲜度。对于大多数表,此设置仅提供几分钟的延迟,而对于最大的表(块表)则提供长达两个小时的延迟(见下图)。
hoodie.datasource.write.partitionpath.field: db_schema_source_partition
配置。这会将 S3 数据集划分为 480 个分片,从 shard0001
到 shard0480
, 更有可能将一批传入更新映射到同一分片中的同一组文件。source-ordering-field: event_lsn
配置。这是基于我们的观察,即较新的块更有可能得到更新,这使我们能够仅使用过时的块来修剪文件。hoodie.index.type: BLOOM
配置,以进一步优化工作负载。Spark数据处理设置
对于我们的大多数数据处理工作,我们使用 PySpark,其相对较低的学习曲线使许多团队成员都可以使用它。对于更复杂的工作,如树遍历和非规范化,我们在几个关键领域利用了Spark的卓越性能:
引导设置
以下是我们引导新表的方法:
t
开始,我们启动 AWS RDS 提供的导出到 S3 作业,将 Postgres 表的最新快照保存到 S3。然后,我们创建一个 Spark 作业来从 S3 读取这些数据,并将它们写入 Hudi 表格式。t
来捕获快照过程中所做的所有更改。此步骤对于保持数据完整性和完整性至关重要。由于 Spark 和 Hudi 的可扩展性,这三个步骤通常在 24 小时内完成,使我们能够在可管理的时间内执行重新引导,以适应新的表请求和 Postgres 升级和重新分片操作。