扩展RDD API三部曲,主要是帮助大家掌握如下三个内容:
1). 回顾一下RDD的基础
2). 扩展Action,也即是自定义RDD算子
3). 扩展 transform及自定义RDD
本文主要是将自定义Spark RDD算子中的Action 类型操作。
1. 准备阶段
讲到自定义RDD的action操作,大家首先应该想到的就是那些RDD到key-value算子的隐式转换,具体一点也就是PairRDDFunctions这个类里包含的算子,比如reducebykey等操作算子。
具体实现肯定是要比较了解scala的隐式转换操作,这个浪尖也发过文章了,可以点击下文阅读:
首先,我们要进行准备操作,首先定义一个case class
classSalesRecord(val transactionId:String,
val customerId:String,
val itemId:String,
val itemValue: Double)extendsComparable[SalesRecord]
withSerializable {
override def compareTo(o: SalesRecord): Int = {
returnthis.transactionId.compareTo(o.transactionId)
}
override def toString:String= {
transactionId+","+customerId+","+itemId+","+itemValue
}
}
然后,定义我们的主要函数:
val sparkConf =newSparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
.set("yarn.resourcemanager.hostname","mt-mdh.local")
.set("spark.executor.instances","2")
.set("spark.default.parallelism","4")
.set("spark.sql.shuffle.partitions","4")
.setJars(List("/opt/sparkjar/bigdata.jar"
,"/opt/jars/spark-streaming-kafka-0-10_2.11-2.3.1.jar"
,"/opt/jars/kafka-clients-0.10.2.2.jar"
,"/opt/jars/kafka_2.11-0.10.2.2.jar"))
val sc =newSparkContext(sparkConf)
val dataRDD = sc.textFile("file:///opt/bigdata/src/main/data/sales.csv")
val salesRecordRDD = dataRDD.map(row => {
val colValues = row.split(",")
newSalesRecord(colValues(),colValues(1),colValues(2),colValues(3).toDouble)
})
这个时候加入我们需要对itemValue字段求和,常见的做法是
salesRecordRDD.map(_.itemValue).sum
其实,sum就是DoubleRDDFunctions内部的算子,也是通过隐式转换实现的。
2. 自定义算子实现
然后就是要定义RDD的操作算子本身,也即是一个工具类,我们叫他为CustomFunctions,内部包含求和函数如下:
这个仔细读一下上面已有的隐式转换算子,可以发现还不行,需要为自定义RDD的操作算子,自定义一个隐士转换的算子工具,内容如下:
objectCustomFunctions{
implicit defaddCustomFunctions(rdd:RDD[SalesRecord]) =newCustomFunctions(rdd)
}
3. 使用算子
调用我们的转换方法:
println("Spark RDD API : "+salesRecordRDD.map(_.itemValue).sum)
importCustomFunctions._
println("Cunstom RDD API : "+salesRecordRDD.totalSales)
输出结果:
这就是自定义RDD的action操作。
下篇文章为自定义RDD和转换操作,这个就只会在星球里分享了欢迎加入浪尖的知识星球,与近420好友一起学习进步。
领取专属 10元无门槛券
私享最新 技术干货