我有一个数据集,它接收对我的数据行的最新编辑,但它只包含最近编辑的版本。(也就是说,它是在update_ts
时间戳列上增量的)。
原表:
| primary_key | update_ts |
|-------------|-----------|
| key_1 | 0 |
| key_2 | 0 |
| key_3 | 0 |
表更新后的表:
| primary_key | update_ts |
|-------------|-----------|
| key_1 | 0 |
| key_2 | 0 |
| key_3 | 0 |
| key_1 | 1 |
| key_2 | 1 |
| key_1 | 2 |
在摄入之后,我需要计算所有先前更新的“最新版本”,同时也要考虑到任何新的编辑。
这意味着我每次都要进行增量摄取并运行快照输出。这对于我的构建来说非常慢,因为我已经注意到,每次我想为我的数据计算最新版本时,我都必须查看我的所有输出行。
事务n=1 (快照):
| primary_key | update_ts |
|-------------|-----------|
| key_1 | 0 |
| key_2 | 0 |
| key_3 | 0 |
事务n=2 (附录):
| primary_key | update_ts |
|-------------|-----------|
| key_1 | 1 |
| key_2 | 1 |
如何使这个“最新版本”计算更快?
发布于 2020-10-20 08:01:50
这是一种从水桶中受益的常见模式。
这样做的要点是:根据primary_key
列将输出快照写入存储桶中,在该栏中,对大得多的输出进行洗牌的昂贵步骤将被完全跳过。
这意味着您只需将您的新数据交换到已经包含您以前历史记录的存储桶。
让我们从初始状态开始,在前面计算的“最新”版本上运行,该版本是一个缓慢的快照:
- output: raw_dataset
input: external_jdbc_system
hive_partitioning: none
bucketing: none
transactions:
- SNAPSHOT
- APPEND
- APPEND
- output: clean_dataset
input: raw_dataset
hive_partitioning: none
bucketing: none
transactions:
- SNAPSHOT
- SNAPSHOT
- SNAPSHOT
如果我们使用将clean_dataset
列上的存储到单独计算以适应我们预期的数据空间的桶数来编写primary_key
,那么我们需要以下代码:
from transforms.api import transform, Input, Output
import pyspark.sql.functions as F
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
@transform(
my_output=Output("/datasets/clean_dataset"),
my_input=Input("/datasets/raw_dataset")
)
def my_compute_function(my_input, my_output):
BUCKET_COUNT = 600
PRIMARY_KEY = "primary_key"
ORDER_COL = "update_ts"
updated_keys = my_input.dataframe("added")
last_written = my_output.dataframe("current")
updated_keys.repartition(BUCKET_COUNT, PRIMARY_KEY)
value_cols = [x for x in last_written.columns if x != PRIMARY_KEY]
updated_keys = updated_keys.select(
PRIMARY_KEY,
*[F.col(x).alias("updated_keys_" + x) for x in value_cols]
)
last_written = last_written.select(
PRIMARY_KEY,
*[F.col(x).alias("last_written_" + x) for x in value_cols]
)
all_rows = updated_keys.join(last_written, PRIMARY_KEY, "fullouter")
latest_df = all_rows.select(
PRIMARY_KEY,
*[F.coalesce(
F.col("updated_keys_" + x),
F.col("last_written_" + x)
).alias(x) for x in value_cols]
)
my_output.set_mode("replace")
return my_output.write_dataframe(
latest_df,
bucket_cols=PRIMARY_KEY,
bucket_count=BUCKET_COUNT,
sort_by=ORDER_COL
)
当它运行时,您将在查询计划中注意到,项目在输出上的步骤不再包含exchange,这意味着它不会对数据进行洗牌。您现在看到的唯一交换是在输入上,它需要以与输出格式化完全相同的方式分发更改(这是一个非常快速的操作)。
然后,将此交换保留到fullouter
联接步骤中,在该步骤中,join将利用此漏洞并非常快速地运行600个任务。最后,我们通过在相同的列上显式地插入相同数量的桶来维护输出上的格式。
注意:使用这种方法,您在每个桶中的文件大小将随着时间的推移而增长,而不考虑增加桶数以保持良好大小的需要。使用这种技术,您最终会达到一个阈值,即文件大小超过128 to,并且不再高效执行(修复方法是增加BUCKET_COUNT
值)。
您的输出现在将如下所示:
- output: raw_dataset
input: external_jdbc_system
hive_partitioning: none
bucketing: none
transactions:
- SNAPSHOT
- APPEND
- APPEND
- output: clean_dataset
input: raw_dataset
hive_partitioning: none
bucketing: BUCKET_COUNT by PRIMARY_KEY
transactions:
- SNAPSHOT
- SNAPSHOT
- SNAPSHOT
https://stackoverflow.com/questions/64448980
复制相似问题