发布
社区首页 >问答首页 >如何快速计算火花放电DataFrame上不同条件下的多个计数?

如何快速计算火花放电DataFrame上不同条件下的多个计数?
EN

Stack Overflow用户
提问于 2021-10-18 13:25:45
回答 2查看 976关注 0票数 0

比方说,我有这个火花缭乱的数据:

代码语言:javascript
代码运行次数:0
复制
data = spark.createDataFrame(schema=['Country'], data=[('AT',), ('BE',), ('France',), ('Latvia',)])

假设我想收集有关这些数据的各种统计数据。例如,我可能想知道有多少行使用两个字符的国家代码,有多少行使用较长的国家名称:

代码语言:javascript
代码运行次数:0
复制
count_short = data.where(F.length(F.col('Country')) == 2).count()
count_long = data.where(F.length(F.col('Country')) > 2).count()

这是可行的,但是当我想根据不同的条件收集许多不同的计数时,即使对于微小的数据集,它也变得非常缓慢。在我工作的Azure中,每个计数都需要1-2秒来计算

我需要做100+计数,对于一个10行的数据集,计算需要几分钟。在有人问之前,这些计数的条件比我的例子要复杂得多。我不能按长度分组,也不能做其他类似的把戏。

我正在寻找一种在任意条件下进行多次计数的通用方法,快速.

我是猜测,性能缓慢的的原因是,对于每一个计数调用,我的吡火花笔记本启动了一些具有很大开销的火花进程。因此,我假设,如果有某种方法在一个查询中收集这些计数,我的性能问题就会得到解决。

我想出的一个可能的解决方案是构建一个临时列,该列指示匹配了哪些条件,然后在它上调用countDistinct。但这样我就有了所有条件匹配组合的个人计数。我还注意到,根据具体情况,在计算统计数据之前执行data = data.localCheckpoint()时,性能要好一些,但一般问题仍然存在。

有更好的办法吗?

EN

回答 2

Stack Overflow用户

发布于 2021-10-19 18:13:28

一种方法是将多个查询合并到一个查询中,而另一个方法是缓存正在被一次又一次查询的数据。通过缓存dataframe,我们避免每次调用count()时重新评估。

代码语言:javascript
代码运行次数:0
复制
data.cache()
票数 1
EN

Stack Overflow用户

发布于 2021-10-21 12:54:10

有几件事要记住。如果您要在dataframe上应用多个操作,并且有大量的转换,并且您正在从某个外部源读取数据,那么在对该dataframe应用任何单个操作之前,您绝对应该缓存该数据。

@pasha701提供的答案有效,但您必须继续根据要分析的不同国家代码长度值添加列。

您可以使用下面的代码来获取不同国家代码的计数,所有这些代码都在一个数据中。

代码语言:javascript
代码运行次数:0
复制
//import statements
from pyspark.sql.functions import *
//sample Dataframe
data = spark.createDataFrame(schema=['Country'], data=[('AT',), ('ACE',), ('BE',), ('France',), ('Latvia',)])
//adding additional column that gives the length of the country codes
data1 = data.withColumn("CountryLength",length(col('Country')))
//creating columns list schema for the final output
outputcolumns = ["CountryLength","RecordsCount"]
//selecting the countrylength column and converting that to rdd and performing map reduce operation to count the occurrences of the same length 
countrieslength = data1.select("CountryLength").rdd.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a +b).toDF(outputcolumns).select("CountryLength.CountryLength","RecordsCount")
//now you can do display or show on the dataframe to see the output
display(countrieslength)

请查看可能获得的输出快照,如下所示:

如果要对此数据应用多个筛选条件,则可以缓存此数据并根据国家代码长度获取不同组合记录的计数。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69616946

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档