我们有如下的用户访问数据:
userID visitDate visitCount
u01 2017/1/21 5
u02 2017/1/23 6
u03 2017/1/22 8
u04 2017/1/20 3
u01 2017/1/23 6
u01 2017/2/21 8
u02 2017/1/23 6
u01 2017/2/22 4
要求使用SQL统计出每个用户的累计访问次数,如下所示:
用户 月份 小计 累计
u01 2017-01 11 11
u01 2017-02 12 23
u02 2017-01 12 12
u03 2017-01 8 8
u04 2017-01 3 3
解释:小计为单月访问次数,累计为在原有单月访问次数基础上累加 将计算结果写入到mysql的表中,自己设计对应的表结构
采用spark local模式,基于scala语言编写
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object WorkVisiter {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("Read Text File with Header")
.master("local")
.getOrCreate()
// 定义结构化数据的字段名和类型
val schema = StructType(Seq(
StructField("userID", StringType, nullable = true),
StructField("visitDate", StringType, nullable = true),
StructField("visitCount", IntegerType, nullable = true)
))
// 读取文本文件,并按照空格进行拆分
val df = spark.read
.option("header", "true")
.option("delimiter", " ")
.schema(schema)
.csv("D:\\tmp\\work0614\\visit.csv")
// 打印 DataFrame 数据
// df.show(false)
// 注册 DataFrame 为临时表
df.createOrReplaceTempView("tb_user")
// 使用窗口函数计算小计和累计访问次数
val result = spark.sql(
"""
|SELECT
| userID
| , date
| , MAX(max_sumAgg) MaxSumAgg
|FROM (
| select
| userID
| ,date
| ,visitCount
| ,sumAgg
| ,MAX(sumAgg) OVER (PARTITION BY userID order by date,userID) AS max_sumAgg
| From (
| select
| userID
| ,date
| ,visitCount
| ,sumAgg
| from (
| select
| userID
| ,CONCAT(SUBSTRING(visitDate, 1, 4), '-', LPAD(SUBSTRING(visitDate, 6, 1), 2, '0')) AS date
| ,visitCount
| ,sum(visitCount) over(partition by userID order by visitDate) as sumAgg
| from
| tb_user
| ) t1
| ) t2
|)t3
|GROUP BY userID, date
|ORDER BY userID, date
| """.stripMargin)
// 打印结果
result.show(false)
}
}
我的思路是首先将日期截取拼接为yyyy-mm
的格式;
使用sum(访问量)开窗,根据用户ID分区,按照月份排序,得出每月的累加,如下表;
|userID|date |visitCount|sumAgg|
+------+-------+----------+------+
|u01 |2017-01|5 |5 |
|u01 |2017-01|6 |11 |
|u01 |2017-02|8 |19 |
|u01 |2017-02|4 |23 |
|u02 |2017-01|6 |12 |
|u02 |2017-01|6 |12 |
|u03 |2017-01|8 |8 |
|u04 |2017-01|3 |3 |
+------+-------+----------+------+
###t3表打印内容 再用max最大值对累加结果开窗,根据用户ID分区,按照日期和用户ID排序,因为有重复的日期,所以需要两个限制条件,这一步计算出了用户每个月的最大访问量,但是未分区排序去重;
+------+-------+----------+------+----------+
|userID|date |visitCount|sumAgg|max_sumAgg|
+------+-------+----------+------+----------+
|u01 |2017-01|5 |5 |11 |
|u01 |2017-01|6 |11 |11 |
|u01 |2017-02|8 |19 |23 |
|u01 |2017-02|4 |23 |23 |
|u02 |2017-01|6 |12 |12 |
|u02 |2017-01|6 |12 |12 |
|u03 |2017-01|8 |8 |8 |
|u04 |2017-01|3 |3 |3 |
+------+-------+----------+------+----------+
GROUP BY userID, date 语句的作用是将结果集按照 userID 和 date 进行分组。这意味着具有相同 userID 和 date 值的行将被归为同一组。 ORDER BY userID, date 语句的作用是对分组后的结果集进行排序。它按照 userID 和 date 的升序对结果进行排序,使得相同 userID 的行按照 date 的顺序排列。 这样做的功能是确保结果集中的行按照 userID 和 date 的顺序进行排列,使得相同用户的不同日期的记录按照日期的先后顺序呈现,方便查看和分析数据。
+------+-------+---------+
|userID|date |MaxSumAgg|
+------+-------+---------+
|u01 |2017-01|11 |
|u01 |2017-02|23 |
|u02 |2017-01|12 |
|u03 |2017-01|8 |
|u04 |2017-01|3 |
+------+-------+---------+