比方说,我有这个火花缭乱的数据:
data = spark.createDataFrame(schema=['Country'], data=[('AT',), ('BE',), ('France',), ('Latvia',)])
假设我想收集有关这些数据的各种统计数据。例如,我可能想知道有多少行使用两个字符的国家代码,有多少行使用较长的国家名称:
count_short = data.where(F.length(F.col('Country')) == 2).count()
count_long = data.where(F.length(F.col('Country')) > 2).count()
这是可行的,但是当我想根据不同的条件收集许多不同的计数时,即使对于微小的数据集,它也变得非常缓慢。在我工作的Azure中,每个计数都需要1-2秒来计算。
我需要做100+计数,对于一个10行的数据集,计算需要几分钟。在有人问之前,这些计数的条件比我的例子要复杂得多。我不能按长度分组,也不能做其他类似的把戏。
我正在寻找一种在任意条件下进行多次计数的通用方法,快速.
我是猜测,性能缓慢的的原因是,对于每一个计数调用,我的吡火花笔记本启动了一些具有很大开销的火花进程。因此,我假设,如果有某种方法在一个查询中收集这些计数,我的性能问题就会得到解决。
我想出的一个可能的解决方案是构建一个临时列,该列指示匹配了哪些条件,然后在它上调用countDistinct
。但这样我就有了所有条件匹配组合的个人计数。我还注意到,根据具体情况,在计算统计数据之前执行data = data.localCheckpoint()
时,性能要好一些,但一般问题仍然存在。
有更好的办法吗?
发布于 2021-10-19 18:13:28
一种方法是将多个查询合并到一个查询中,而另一个方法是缓存正在被一次又一次查询的数据。通过缓存dataframe,我们避免每次调用count()时重新评估。
data.cache()
发布于 2021-10-21 12:54:10
有几件事要记住。如果您要在dataframe上应用多个操作,并且有大量的转换,并且您正在从某个外部源读取数据,那么在对该dataframe应用任何单个操作之前,您绝对应该缓存该数据。
@pasha701提供的答案有效,但您必须继续根据要分析的不同国家代码长度值添加列。
您可以使用下面的代码来获取不同国家代码的计数,所有这些代码都在一个数据中。
//import statements
from pyspark.sql.functions import *
//sample Dataframe
data = spark.createDataFrame(schema=['Country'], data=[('AT',), ('ACE',), ('BE',), ('France',), ('Latvia',)])
//adding additional column that gives the length of the country codes
data1 = data.withColumn("CountryLength",length(col('Country')))
//creating columns list schema for the final output
outputcolumns = ["CountryLength","RecordsCount"]
//selecting the countrylength column and converting that to rdd and performing map reduce operation to count the occurrences of the same length
countrieslength = data1.select("CountryLength").rdd.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a +b).toDF(outputcolumns).select("CountryLength.CountryLength","RecordsCount")
//now you can do display or show on the dataframe to see the output
display(countrieslength)
请查看可能获得的输出快照,如下所示:
如果要对此数据应用多个筛选条件,则可以缓存此数据并根据国家代码长度获取不同组合记录的计数。
https://stackoverflow.com/questions/69616946
复制相似问题