我尝试运行2个数据流,在第一个数据流中生成Dataframe,将df注册为tmp视图,然后在另一个数据流中使用它,如下所示:
dstream1.foreachRDD { rdd =>
import org.apache.spark.sql._
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate
import spark.implicits._
import spark.sql
val records = rdd.toDF("record")
r
{
var history: RDD[(String, List[String]) = sc.emptyRDD()
val dstream1 = ...
val dstream2 = ...
val historyDStream = dstream1.transform(rdd => rdd.union(history))
val joined = historyDStream.join(dstream2)
... do stuff with joined as above, obtain dstreamFiltered ...
dstreamFiltered.foreachRD
我正在使用一些dataframe df创建一个数据样本
rdd = df.limit(10000).rdd
这个操作需要相当长的时间(为什么呢?它不能在10000行之后省略吗?),所以我假设我现在有了一个新的RDD。
但是,当我现在使用rdd时,每次访问它都是不同的行。就像它再次重新采样一样。缓存RDD有一点帮助,但这肯定不是保存?
背后的原因是什么?
更新:这里是Spark 1.5.2的复制品
from operator import add
from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(100
星火应用程序需要验证RDD中的每个元素。
给定一个名为Validator的驱动程序\客户端Scala对象,以下两种解决方案中哪一种更好:
rdd.filter { x => if Validator.isValid(x.somefield) true else false }
或者类似的东西
// get list of the field to validate against
val list = rdd.map(x => x.somefield)
// Use the Validator to check which ones are invalid
var invalidE
我是Spark的新手,有一些关于Spark RDD操作和创建的问题:
val rdd1 = sc.parallelize(List("yellow","red","blue","cyan","black"),3)
val mapped = rdd1.mapPartitionsWithIndex{(index, iterator) => {
println("Called in Partition -> " + inde
我是非常新的阿帕奇·斯帕克。我正在尝试将csv文件加载到Spark RDD和DataFrames中。
我使用RDD来操作数据,使用Dataframe for SQL对数据帧进行类似的操作。
在将RDD转换为Spark DataFrame时,我遇到了一个问题。下面给出了问题陈述。
# to load data
dataRDD = sc.textFile(trackfilepath)
# To use it as a csv
dataRDD = testData.mapPartitions(lambda x: csv.reader(x))
# To load into data fr
如何在订购操作后返回RDD。我想用一个值来排序,取最高的结果,然后按第二个值排序。
例如,
rdd = sc.parallelize([(1, "a", 10), (2, "b", 9), (3, "c", 8)])
res = rdd.takeOrdered(2, lambda x: x[0]) # sort on first value
# sort on second value
out = sc.parallelize(res).sortBy(lambda x: x[2]).collect()
但是,我可以让res成为一个RDD并在一步
我想要将列表((A,1,2,3),(B,4,5,6),(C,7,8,9))拆分成:
(A,1)
(A,2)
(A,3)
(B,4)
(B,5)
...
我尝试过rdd.flatMapValues(lambda x: [x]),但它只给了我第一个值。然后我写了一个小的python函数:
item_index = []
for list in master_list:
for item in list:
item_index.append((list[0],item))
但我没意识到你不能把RDD传入其中。任何建议都是很棒的。