摘 要
本文将介绍通过Apache Spark实现离线统计网站每日pv的思路及代码。
将数据按照域名分组,然后按照日期升序排序,点击量降续排序。
**
* 简单的pv统计
*/
object PageView {
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("Usage: <file>")
System.exit(1)
}
val conf = new SparkConf().setAppName("PageView")
val sc = new SparkContext(conf)
val rdd1 = sc.textFile(args(0)).map(line => {
//封装数据,数据封装为((a,b),c) key为元祖类型
val f = line.split("\t")
((f(0), f(1)), 1)
})
//根据封装完的数据聚合一次。将相同的key加1
val rdd2 = rdd1.reduceByKey(_ + _)
//第二次封装,将host提取出,数据格式为(日期,域名,地址,点击量)
val rdd3 = rdd2.map(line => {
val url = line._1._2
val date = line._1._1
val host = new URL(url).getHost
(date, host, url, line._2)
})
//分组排序,根据域名分组,根据日期和点击量排序
//排序规则:日期升序,点击量降序
val rdd5 = rdd3.groupBy(_._2).mapValues(it => {
it.toList.sortBy(x => PVSort(x._1, x._4))
}).saveAsTextFile(args(1))
sc.stop()
}
}
/**
* 自定义排序,日期升序,点击量降序
*
* @param date
* @param count
*/
case class PVSort(date: String, count: Int) extends Ordered[PVSort] with Serializable {
override def compare(that: PVSort): Int = {
val i = this.date.compareTo(that.date)
if (i == 0) {
return -this.count.compareTo(that.count)
} else {
return i
}
/*val i = this.count.compareTo(that.count)
if (i == 0) {
return -this.date.compareTo(that.date)
} else {
return -i
}*/
}
}