Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >客快物流大数据项目(六十三):快递单主题

客快物流大数据项目(六十三):快递单主题

作者头像
Lansonli
发布于 2022-03-13 04:01:33
发布于 2022-03-13 04:01:33
79500
代码可运行
举报
文章被收录于专栏:Lansonli技术博客Lansonli技术博客
运行总次数:0
代码可运行

目录

快递单

一、背景介绍

二、指标明细

三、表关联关系

1、事实表

2、 维度表

3、关联关系

四、快递单数据拉宽开发 

1、拉宽后的字段

2、SQL语句

3、Spark实现

4、​​​​​​​测试验证

五、​​​​​​​快递单数据指标计算开发

1、​​​​​​​计算的字段

2、Spark实现

​​​​​​​3、测试验证

快递单主题

一、背景介绍

快递单量的统计主要是从多个不同的维度计算快递单量,从而监测快递公司业务运营情况。

二、指标明细

指标列表

维度

快递单数

总快递单数

最大快递单数

各类客户最大快递单数

各渠道最大快递单数

各网点最大快递单数

各终端最大快递单数

最小快递单数

各类客户最小快递单数

各渠道最小快递单数

各网点最小快递单数

各终端最小快递单数

平均快递单数

各类客户平均快递单数

各渠道平均快递单数

各网点平均快递单数

各终端平均快递单数

三、表关联关系

1、事实表

表名

描述

tbl_express_bill

快递单据表

2、 维度表

表名

描述

tbl_consumer

客户表

tbl_courier

快递员表

tbl_pkg

包裹表

tbl_areas

区域表

tbl_dot

网点表

tbl_company_dot_map

公司网点关联表

tbl_company

公司表

tbl_consumer_address_map

客户地址关联表

tbl_address

客户地址表

tbl_codes

字典表

3、关联关系

快递单表与维度表的关联关系如下:

四、快递单数据拉宽开发 

1、拉宽后的字段

字段名

别名

字段描述

tbl_express_bill

id

id

快递单id

tbl_express_bill

expressNumber

express_number

快递单编号

tbl_express_bill

cid

cid

客户ID

tbl_customer

name

cname

客户名字

tbl_address

detailAddr

caddress

详细地址

tbl_express_bill

eid

eid

员工ID

tbl_courier

name

ename

快递员姓名

tbl_dot

id

dot_id

网点ID

tbl_dot

dotName

dot_name

网点名称

tbl_company

companyName

company_name

公司名字

tbl_express_bill

orderChannelId

order_channel_id

下单渠道ID

tbl_codes

channelTypeName

order_channel_name

下单渠道名称

tbl_express_bill

orderDt

order_dt

下单时间

tbl_express_bill

orderTerminalType

order_terminal_type

下单设备类型ID

tbl_codes

orderTypeName

order_terminal_type_name

下单设备类型名称

tbl_express_bill

orderTerminalOsType

order_terminal_os_type

下单设备操作系统ID

tbl_express_bill

reserveDt

reserve_dt

预约取件时间

tbl_express_bill

isCollectPackageTimeout

is_collect_package_timeout

是否取件超时

tbl_express_bill

timeoutDt

timeout_dt

超时时间

tbl_customer

type

ctype

客户类别id

tbl_codes

code_desc

ctype_name

客户类别名称

tbl_express_bill

cdt

cdt

创建时间

tbl_express_bill

udt

udt

修改时间

tbl_express_bill

remark

remark

备注

tbl_express_bill

yyyyMMdd(cdt)

day

创建时间 年月日格式

2、SQL语句

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
SELECT EBILL."id" ,
EBILL ."express_number" ,
EBILL ."cid" ,
CUSTOMER."name" AS cname,
ADDRESS ."detail_addr" AS caddress,
EBILL."eid" ,
COURIER ."name" AS ename,
DOT ."id" AS dot_id,
DOT ."dot_name" AS dot_name,
COMPANY ."company_name" AS company_name,
EBILL."order_channel_id" ,
code1."name" AS "order_channel_name",
ebill."order_dt",
ebill."order_terminal_type",
code2."name" AS order_terminal_type_name,
ebill."order_terminal_os_type" ,
ebill."reserve_dt" ,
ebill."is_collect_package_timeout" ,
ebill."timeout_dt" ,
CUSTOMER."type" ,
ebill."cdt" ,
ebill."udt" ,
ebill."remark" 
FROM "tbl_express_bill" EBILL
LEFT JOIN "tbl_courier" courier ON EBILL."eid" = courier."id" 
LEFT JOIN "tbl_customer" customer ON ebill."cid" = CUSTOMER ."id" 
LEFT JOIN "tbl_codes" code1 ON code1."type" =18 AND ebill."order_channel_id" =code1."code"  
LEFT JOIN "tbl_codes" code2 ON code2."type" =17 AND ebill."order_terminal_type" =code2."code"  
LEFT JOIN "tbl_consumer_address_map" address_map ON CUSTOMER."id" = address_map."consumer_id"
LEFT JOIN "tbl_address" address ON address_map."address_id" = ADDRESS."id"
LEFT JOIN "tbl_pkg" pkg ON EBILL."express_number" = pkg."pw_bill"
LEFT JOIN "tbl_dot" dot ON PKG ."pw_dot_id"=dot."id" 
LEFT JOIN "tbl_company_dot_map" companydot ON dot."id" =COMPANYDOT ."dot_id" 
LEFT JOIN "tbl_company" company ON COMPANY ."id"=companydot."company_id" 

3、​​​​​​​Spark实现

实现步骤:

  • dwd目录下创建 ExpressBillDWD 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 获取快递单表(tbl_express_bill)数据,并缓存数据(缓存两份数据,生产环境中肯定是多台服务器,为了避免数据丢失缓存2份数据,测试环境只有一台服务器,按照生产环境进行开发)
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 获取客户表(tbl_customer)数据,并缓存数据
  • 获取快递员表(tbl_courier)数据,并缓存数据
  • 获取包裹表(tbl_pkg)数据,并缓存数据
  • 获取网点表(tbl_dot)数据,并缓存数据
  • 获取区域表(tbl_areas)数据,并缓存数据
  • 获取公司网点关联表(tbl_company_dot_map)数据,并缓存数据
  • 获取公司表(tbl_company)数据,并缓存数据
  • 获取客户地址关联表(tbl_consumer_address_map)数据,并缓存数据
  • 获取客户地址表(tbl_address)数据,并缓存数据
  • 获取字典表(tbl_codes)数据,并缓存数据
  • 根据以下方式拉宽快递单明细数据
    • 根据客户id,在客户表中获取客户数据
    • 根据快递员id,在快递员表中获取快递员数据
    • 根据客户id,在客户地址表中获取客户地址数据
    • 根据快递单号,在包裹表中获取包裹数据
    • 根据包裹的发货网点id,获取到网点数据
    • 根据网点id, 获取到公司数据
    • 创建快递单明细宽表(若存在则不创建)
  • 将快递单明细宽表数据写入到kudu数据表中
  • 删除缓存数据

​​​​​​​​​​​​​​初始化环境变量

  • 初始化快递单明细拉宽作业的环境变量
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package cn.it.logistics.offline.dwd

import cn.it.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType

/**
 * 快递单主题开发
 * 将快递单事实表的数据与相关维度表的数据进行关联,然后将拉宽后的数据写入到快递单宽表中
 * 采用DSL语义实现离线计算程序
 *
 * 最终离线程序需要部署到服务器,每天定时执行(azkaban定时调度)
 */
object ExpressBillDWD extends OfflineApp {
  //定义应用的名称
  val appName = this.getClass.getSimpleName

/**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)初始化sparkConf对象
     * 2)创建sparkSession对象
     * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
     * 4)定义维度表与事实表的关联
     * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
     * 5.1:创建快递单明细宽表的schema表结构
     * 5.2:创建快递单宽表(判断宽表是否存在,如果不存在则创建)
     * 5.3:将数据写入到kudu中
     * 6):将缓存的数据删除掉
     */

    //1)初始化sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //2)创建sparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //数据处理
    execute(sparkSession)
  }

  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    sparkSession.stop()
  }
}

​​​​​​​加载快递单相关的表数据并缓存

  • 加载快递单表的时候,需要指定日期条件,因为快递单主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//3.1:加载快递单事实表的数据
val expressBillDF: DataFrame = getKuduSource(sparkSession, TableMapping.expressBill, Configuration.isFirstRunnable)
  .persist(StorageLevel.DISK_ONLY_2) //将数据缓存两个节点的磁盘目录,避免单机故障导致的缓存数据丢失

//3.2:加载快递员维度表的数据
val courierDF: DataFrame = getKuduSource(sparkSession, TableMapping.courier, true).persist(StorageLevel.DISK_ONLY_2)

//3.2:加载客户维度表的数据
val customerDF: DataFrame = getKuduSource(sparkSession, TableMapping.customer, true).persist(StorageLevel.DISK_ONLY_2)

//3.4:加载物流码表的数据
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)

//3.5:客户地址关联表的数据
val addressMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.consumerAddressMap, true).persist(StorageLevel.DISK_ONLY_2)

//3.6:加载地址表的数据
val addressDF: DataFrame = getKuduSource(sparkSession, TableMapping.address, true).persist(StorageLevel.DISK_ONLY_2)

//3.7:加载包裹表的数据
val pkgDF: DataFrame = getKuduSource(sparkSession, TableMapping.pkg, true).persist(StorageLevel.DISK_ONLY_2)

//3.8:加载网点表的数据
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)

//3.9:加载公司网点表的数据
val companyDotMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)

//3.10:加载公司表的数据
val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)

//导入隐式转换
import  sparkSession.implicits._

//获取终端类型码表数据
val orderTerminalTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.OrderTerminalType)
  .select($"code".as("OrderTerminalTypeCode"), $"codeDesc".as("OrderTerminalTypeName"))

//获取下单终端类型码表数据
val orderChannelTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.OrderChannelType)
  .select($"code".as("OrderChannelTypeCode"), $"codeDesc".as("OrderChannelTypeName"))

​​​​​​​定义表的关联关系

  • 为了在DWS层任务中方便的获取每日增量快递单数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd

代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//TODO 4)定义维度表与事实表的关联
val joinType = "left_outer"
val expressBillDetailDF: DataFrame = expressBillDF
  .join(courierDF, expressBillDF("eid") === courierDF("id") ,joinType) //快递单表与快递员表进行关联
  .join(customerDF, expressBillDF("cid")  === customerDF("id"), joinType) //快递单表与客户表进行关联
  .join(orderChannelTypeDF, orderChannelTypeDF("OrderChannelTypeCode") === expressBillDF("orderChannelId"), joinType) //下单渠道表与快递单表关联
  .join(orderTerminalTypeDF, orderTerminalTypeDF("OrderTerminalTypeCode") === expressBillDF("orderTerminalType"), joinType) //终端类型表与快递单表关联
  .join(addressMapDF, addressMapDF("consumerId") === customerDF("id"), joinType) //客户地址关联表与客户表关联
  .join(addressDF, addressDF("id") === addressMapDF("addressId"), joinType)  //地址表与客户地址关联表关联
  .join(pkgDF, pkgDF("pwBill") === expressBillDF("expressNumber"), joinType) //包裹表与快递单表关联
  .join(dotDF, dotDF("id") === pkgDF("pwDotId"), joinType)  //网点表与包裹表关联
  .join(companyDotMapDF, companyDotMapDF("dotId") === dotDF("id"), joinType) //公司网点关联表与网点表关联
  .join(companyDF, companyDF("id") === companyDotMapDF("companyId"), joinType) //公司网点关联表与公司表关联
  .withColumn("day", date_format(expressBillDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
  .sort(expressBillDF.col("cdt").asc) //根据快递单的创建时间顺序排序
  .select(
    expressBillDF("id"),        //快递单id
    expressBillDF("expressNumber").as("express_number"),  //快递单编号
    expressBillDF("cid"),     //客户id
    customerDF("name").as("cname"), //客户名称
    addressDF("detailAddr").as("caddress"),      //客户地址
    expressBillDF("eid"), //员工id
    courierDF("name").as("ename"), //员工名称
    dotDF("id").as("dot_id"), //网点id
    dotDF("dotName").as("dot_name"), //网点名称
    companyDF("companyName").as("company_name"),//公司名称
    expressBillDF("orderChannelId").as("order_channel_id"), //下单渠道id
    orderChannelTypeDF("OrderChannelTypeName").as("order_channel_name"), //下单渠道id
    expressBillDF("orderDt").as("order_dt"),//下单时间
    orderTerminalTypeDF("OrderTerminalTypeCode").as("order_terminal_type"), //下单设备类型id
    orderTerminalTypeDF("OrderTerminalTypeName").as("order_terminal_type_name"), //下单设备类型id
    expressBillDF("orderTerminalOsType").as("order_terminal_os_type"),//下单设备操作系统
    expressBillDF("reserveDt").as("reserve_dt"),//预约取件时间
    expressBillDF("isCollectPackageTimeout").as("is_collect_package_timeout"),//是否取件超时
    expressBillDF("timeoutDt").as("timeout_dt"),//超时时间
    customerDF("type"),//客户类型
    expressBillDF("cdt"),//创建时间
    expressBillDF("udt"),//修改时间
    expressBillDF("remark"),//备注
    $"day"
  )

​​​​​​​创建快递单明细宽表并将快递单明细数据写入到宽表中

快递单宽表数据需要保存到kudu中,因此在第一次执行快递单明细拉宽操作时,快递单明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建

实现步骤:

  • ExpressBillDWD 单例对象中调用父类save方法
    • 判断宽表是否存在,如果不存在则创建宽表
    • 将明细数据写入到宽表中

参考代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
save(expressBillDetailDF, OfflineTableDefine.expressBillDetail)

​​​​​​​删除缓存数据

为了释放资源,快递单明细宽表数据计算完成以后,需要将缓存的源表数据删除。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//TODO 6) 将缓存的数据删除掉
expressBillDF.unpersist()
courierDF.unpersist()
customerDF.unpersist()
orderChannelTypeDF.unpersist()
orderTerminalTypeDF.unpersist()
addressMapDF.unpersist()
addressDF.unpersist()
pkgDF.unpersist()
dotDF.unpersist()
companyDotMapDF.unpersist()
companyDF.unpersist()

​​​​​​​完整代码

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package cn.it.logistics.offline.dwd

import cn.it.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType

/**
 * 快递单主题开发
 * 将快递单事实表的数据与相关维度表的数据进行关联,然后将拉宽后的数据写入到快递单宽表中
 * 采用DSL语义实现离线计算程序
 *
 * 最终离线程序需要部署到服务器,每天定时执行(azkaban定时调度)
 */
object ExpressBillDWD extends OfflineApp {
  //定义应用的名称
  val appName = this.getClass.getSimpleName

/**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)初始化sparkConf对象
     * 2)创建sparkSession对象
     * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
     * 4)定义维度表与事实表的关联
     * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
     * 5.1:创建快递单明细宽表的schema表结构
     * 5.2:创建快递单宽表(判断宽表是否存在,如果不存在则创建)
     * 5.3:将数据写入到kudu中
     * 6):将缓存的数据删除掉
     */

    //1)初始化sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //2)创建sparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //数据处理
    execute(sparkSession)
  }

  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    //TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
    //3.1:加载快递单事实表的数据
    val expressBillDF: DataFrame = getKuduSource(sparkSession, "tbl_express_bill", Configuration.isFirstRunnable)
      .persist(StorageLevel.DISK_ONLY_2) //将数据缓存两个节点的磁盘目录,避免单机故障导致的缓存数据丢失

    //3.2:加载快递员维度表的数据
    val courierDF: DataFrame = getKuduSource(sparkSession, "tbl_courier", true).persist(StorageLevel.DISK_ONLY_2)

    //3.2:加载客户维度表的数据
    val customerDF: DataFrame = getKuduSource(sparkSession, "tbl_customer", true).persist(StorageLevel.DISK_ONLY_2)

    //3.4:加载物流码表的数据
    val codesDF: DataFrame = getKuduSource(sparkSession, "tbl_codes", true).persist(StorageLevel.DISK_ONLY_2)

    //3.5:客户地址关联表的数据
    val addressMapDF: DataFrame = getKuduSource(sparkSession, "tbl_consumer_address_map", true).persist(StorageLevel.DISK_ONLY_2)

    //3.6:加载地址表的数据
    val addressDF: DataFrame = getKuduSource(sparkSession, "tbl_address", true).persist(StorageLevel.DISK_ONLY_2)

    //3.7:加载包裹表的数据
    val pkgDF: DataFrame = getKuduSource(sparkSession, "tbl_pkg", true).persist(StorageLevel.DISK_ONLY_2)

    //3.8:加载网点表的数据
    val dotDF: DataFrame = getKuduSource(sparkSession, "tbl_dot", true).persist(StorageLevel.DISK_ONLY_2)

    //3.9:加载公司网点表的数据
    val companyDotMapDF: DataFrame = getKuduSource(sparkSession, "tbl_company_dot_map", true).persist(StorageLevel.DISK_ONLY_2)

    //3.10:加载公司表的数据
    val companyDF: DataFrame = getKuduSource(sparkSession, "tbl_company", true).persist(StorageLevel.DISK_ONLY_2)

    //导入隐式转换
    import  sparkSession.implicits._

    //获取终端类型码表数据
    val orderTerminalTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.OrderTerminalType)
      .select($"code".as("OrderTerminalTypeCode"), $"codeDesc".as("OrderTerminalTypeName"))

    //获取下单终端类型码表数据
    val orderChannelTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.OrderChannelType)
      .select($"code".as("OrderChannelTypeCode"), $"codeDesc".as("OrderChannelTypeName"))

    //TODO 4)定义维度表与事实表的关联
    val joinType = "left_outer"
    val expressBillDetailDF: DataFrame = expressBillDF
      .join(courierDF, expressBillDF("eid") === courierDF("id") ,joinType) //快递单表与快递员表进行关联
      .join(customerDF, expressBillDF("cid")  === customerDF("id"), joinType) //快递单表与客户表进行关联
      .join(orderChannelTypeDF, orderChannelTypeDF("OrderChannelTypeCode") === expressBillDF("orderChannelId"), joinType) //下单渠道表与快递单表关联
      .join(orderTerminalTypeDF, orderTerminalTypeDF("OrderTerminalTypeCode") === expressBillDF("orderTerminalType"), joinType) //终端类型表与快递单表关联
      .join(addressMapDF, addressMapDF("consumerId") === customerDF("id"), joinType) //客户地址关联表与客户表关联
      .join(addressDF, addressDF("id") === addressMapDF("addressId"), joinType)  //地址表与客户地址关联表关联
      .join(pkgDF, pkgDF("pwBill") === expressBillDF("expressNumber"), joinType) //包裹表与快递单表关联
      .join(dotDF, dotDF("id") === pkgDF("pwDotId"), joinType)  //网点表与包裹表关联
      .join(companyDotMapDF, companyDotMapDF("dotId") === dotDF("id"), joinType) //公司网点关联表与网点表关联
      .join(companyDF, companyDF("id") === companyDotMapDF("companyId"), joinType) //公司网点关联表与公司表关联
      .withColumn("day", date_format(expressBillDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
      .sort(expressBillDF.col("cdt").asc) //根据快递单的创建时间顺序排序
      .select(
        expressBillDF("id"),        //快递单id
        expressBillDF("expressNumber").as("express_number"),  //快递单编号
        expressBillDF("cid"),     //客户id
        customerDF("name").as("cname"), //客户名称
        addressDF("detailAddr").as("caddress"),      //客户地址
        expressBillDF("eid"), //员工id
        courierDF("name").as("ename"), //员工名称
        dotDF("id").as("dot_id"), //网点id
        dotDF("dotName").as("dot_name"), //网点名称
        companyDF("companyName").as("company_name"),//公司名称
        expressBillDF("orderChannelId").as("order_channel_id"), //下单渠道id
        orderChannelTypeDF("OrderChannelTypeName").as("order_channel_name"), //下单渠道id
        expressBillDF("orderDt").as("order_dt"),//下单时间
        orderTerminalTypeDF("OrderTerminalTypeCode").as("order_terminal_type"), //下单设备类型id
        orderTerminalTypeDF("OrderTerminalTypeName").as("order_terminal_type_name"), //下单设备类型id
        expressBillDF("orderTerminalOsType").as("order_terminal_os_type"),//下单设备操作系统
        expressBillDF("reserveDt").as("reserve_dt"),//预约取件时间
        expressBillDF("isCollectPackageTimeout").as("is_collect_package_timeout"),//是否取件超时
        expressBillDF("timeoutDt").as("timeout_dt"),//超时时间
        customerDF("type"),//客户类型
        expressBillDF("cdt"),//创建时间
        expressBillDF("udt"),//修改时间
        expressBillDF("remark"),//备注
        $"day"
      )

    //TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
    save(expressBillDetailDF, OfflineTableDefine.expressBillDetail)

    //TODO 6) 将缓存的数据删除掉
    expressBillDF.unpersist()
    courierDF.unpersist()
    customerDF.unpersist()
    orderChannelTypeDF.unpersist()
    orderTerminalTypeDF.unpersist()
    addressMapDF.unpersist()
    addressDF.unpersist()
    pkgDF.unpersist()
    dotDF.unpersist()
    companyDotMapDF.unpersist()
    companyDF.unpersist()

    sparkSession.stop()
  }
}

4、​​​​​​​测试验证

实现步骤:

  • ExpressBillDWD 单例对象中读取快递单明细宽表的数据
  • 输出展示

实现过程:

  • ExpressBillDWD 单例对象中读取快递单明细宽表的数据
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 检查今日数据
spark.read
  .format(Configure.SPARK_KUDU_FORMAT)
  .options(Map("kudu.master" -> Configure.kuduRpcAddress, "kudu.table" -> table))
  .load
  .show
  • 输出展示

五、​​​​​​​快递单数据指标计算开发

1、​​​​​​​计算的字段

字段名

字段描述

id

数据产生时间

totalExpressBillCount

总快递单数

maxTypeExpressBillTotalCount

各类客户最大快递单数

minTypeExpressBillTotalCount

各类客户最小快递单数

avgTypeExpressBillTotalCount

各类客户平均快递单数

maxDotExpressBillTotalCount

各网点最大快递单数

minDotExpressBillTotalCount

各网点最小快递单数

avgDotExpressBillTotalCount

各网点平均快递单数

maxChannelExpressBillTotalCount

各渠道最大快递单数

minChannelExpressBillTotalCount

各渠道最小快递单数

avgChannelExpressBillTotalCount

各渠道平均快递单数

maxTerminalExpressBillTotalCount

各终端最大快递单数

minTerminalExpressBillTotalCount

各终端最小快递单数

avgTerminalExpressBillTotalCount

各终端平均快递单数

2、Spark实现

实现步骤:

  • 在dws目录下创建 ExpressBillDWS 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 根据指定的日期获取拉宽后的快递单宽表(tbl_express_bill_detail)增量数据,并缓存数据
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 指标计算
    • 计算总快递单数
    • 各类客户快递单数
    • 各类客户最大快递单数
    • 各类客户最小快递单数
    • 各类客户平均快递单数
    • 各网点快递单数
    • 各网点最大快递单数
    • 各网点最小快递单数
    • 各网点平均快递单数
    • 各渠道快递单数
    • 各渠道最大快递单数
    • 各渠道最小快递单数
    • 各渠道平均快递单数
    • 各终端快递单数
    • 各终端最大快递单数
    • 各终端最小快递单数
    • 各终端平均快递单数
    • 获取当前时间yyyyMMddHH
  • 构建要持久化的指标数据(需要判断计算的指标是否有值,若没有需要赋值默认值
  • 通过StructType构建指定Schema
  • 创建快递单指标数据表(若存在则不创建)
  • 持久化指标数据到kudu表

​​​​​​​初始化环境变量

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package cn.it.logistics.offline.dws

import cn.it.logistics.common.{Configuration, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object ExpressBillDWS  extends OfflineApp{
  //定义应用程序的名称
  val appName = this.getClass.getSimpleName

  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)创建SparkConf对象
     * 2)创建SparkSession对象
     * 3)读取快递明细宽表的数据
     * 4)对快递明细宽表的数据进行指标的计算
     * 5)将计算好的指标数据写入到kudu数据库中
     * 5.1:定义指标结果表的schema信息
     * 5.2:组织需要写入到kudu表的数据
     * 5.3:判断指标结果表是否存在,如果不存在则创建
     * 5.4:将数据写入到kudu表中
     * 6)删除缓存数据
     * 7)停止任务,退出sparksession
     */

    //TODO 1)创建SparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //TODO 2)创建SparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //处理数据
    execute(sparkSession)
  }

  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    // 退出sc
    sparkSession.stop
  }
}

​​​​​​​加载快递单宽表增量数据并缓存

加载快递单宽表的时候,需要指定日期条件,因为快递单主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//TODO 3)读取快递单明细宽表的数据
val expressBillDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.expressBillDetail, Configuration.isFirstRunnable)

 ​​​​​​​指标计算

程序首次运行需要全量装载历史的快递单数据,离线计算程序每天计算昨天增量数据,因此需要将历史的数据进行按照天进行分组,然后根据某一天来进行统计当前日期下的快递单相关指标数据

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//读取出来的明细宽表数据可能是增量的数据,也可能是全量的数据
//全量的数据是包含多个日期的数据,增量数据是前一天的数据
//需要计算的指标是以日为单位,每天的最大快递单数、最小快递单数、平均快递单数据
//因此需要对读取出来的快递单明细宽表数据按照日为单位进行分组,然后统计每日的指标数据
val expressBillDetailGroupByDayDF: DataFrame = expressBillDetailDF.select("day").groupBy("day").count().cache()

//导入隐式转换
import sparkSession.implicits._

//定义计算好的指标结果集合对象
val rows: ArrayBuffer[Row] = ArrayBuffer[Row]()

//循环遍历所有日期的数据
expressBillDetailGroupByDayDF.collect().foreach(row=>{
  //获取到需要处理的数据所在的日期
  val day: String = row.getAs[String](0)

  //根据日期查询该日期内的快递单明细数据,然后将查询到的结果进行指标计算(指定日期的指标)
  val expressBillDetailByDayDF: DataFrame = expressBillDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)

  //TODO 4)对快递明细宽表的数据进行指标的计算
  //总快递单数
  val totalExpressBillCount: Long = expressBillDetailByDayDF.agg(count("express_number")).first().getLong(0)
  //各类客户的快递单数
  val customerTypeExpressBillTotalCountDF: DataFrame = expressBillDetailByDayDF.groupBy($"type").agg(count("express_number").as("express_number")).cache()

  //各类客户最大快递单数
  val maxTypeExpressBillTotalCount: Row = customerTypeExpressBillTotalCountDF.agg(max("express_number")).first()

  //各类客户最小快递单数
  val minTypeExpressBillTotalCount: Row = customerTypeExpressBillTotalCountDF.agg(min("express_number")).first()

  //各类客户平均快递单数
  val avgTypeExpressBillTotalCount: Row = customerTypeExpressBillTotalCountDF.agg(avg("express_number")).first()

  //各网点的快递单数
  val dotExpressBillTotalCountDF: DataFrame = expressBillDetailByDayDF.groupBy($"dot_id").agg(count("express_number").as("express_number")).cache()

  //各网点最大快递单数
  val maxDotExpressBillTotalCount: Row = dotExpressBillTotalCountDF.agg(max("express_number")).first()

  //各网点最小快递单数
  val minDotExpressBillTotalCount: Row = dotExpressBillTotalCountDF.agg(min("express_number")).first()

  //各网点平均快递单数
  val avgDotExpressBillTotalCount: Row = dotExpressBillTotalCountDF.agg(avg("express_number")).first()

  //各渠道的快递单数
  val channelExpressBillTotalCountDF: DataFrame = expressBillDetailByDayDF.groupBy($"order_channel_id").agg(count("express_number").as("express_number")).cache()

  //各渠道最大快递单数
  val maxChannelExpressBillTotalCount: Row = channelExpressBillTotalCountDF.agg(max("express_number")).first()

  //各渠道最小快递单数
  val minChannelExpressBillTotalCount: Row = channelExpressBillTotalCountDF.agg(min("express_number")).first()

  //各渠道平均快递单数
  val avgChannelExpressBillTotalCount: Row = channelExpressBillTotalCountDF.agg(avg("express_number")).first()

  val terminalExpressBillTotalCountDF: DataFrame = expressBillDetailByDayDF.groupBy($"order_terminal_type").agg(count("express_number").as("express_number")).cache()
  println(terminalExpressBillTotalCountDF)

  //各终端最大快递单数
  val maxTerminalExpressBillTotalCount: Row = terminalExpressBillTotalCountDF.agg(max("express_number")).first()

  //各终端最小快递单数
  val minTerminalExpressBillTotalCount: Row = terminalExpressBillTotalCountDF.agg(min("express_number")).first()

  //各终端平均快递单数
  val avgTerminalExpressBillTotalCount: Row = terminalExpressBillTotalCountDF.agg(avg("express_number")).first()
  println(avgTerminalExpressBillTotalCount)

  //将每条记录写入到Row对象中
  val rowInfo = Row(
    day,
    totalExpressBillCount, //总快递单数
    if(maxTypeExpressBillTotalCount.isNullAt(0)) 0L else maxTypeExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(minTypeExpressBillTotalCount.isNullAt(0)) 0L else minTypeExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(avgTypeExpressBillTotalCount.isNullAt(0)) 0L else avgTypeExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxDotExpressBillTotalCount.isNullAt(0)) 0L else maxDotExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(minDotExpressBillTotalCount.isNullAt(0)) 0L else minDotExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(avgDotExpressBillTotalCount.isNullAt(0)) 0L else avgDotExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxChannelExpressBillTotalCount.isNullAt(0)) 0L else maxChannelExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(minChannelExpressBillTotalCount.isNullAt(0)) 0L else minChannelExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(avgChannelExpressBillTotalCount.isNullAt(0)) 0L else avgChannelExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxTerminalExpressBillTotalCount.isNullAt(0)) 0L else maxTerminalExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(minTerminalExpressBillTotalCount.isNullAt(0)) 0L else minTerminalExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(avgTerminalExpressBillTotalCount.isNullAt(0)) 0L else avgTerminalExpressBillTotalCount.get(0).asInstanceOf[Number].longValue()
  )

  println(rowInfo)

  //将计算好的结果数据写入到结果对象中
  rows.append(rowInfo)

  //释放资源
  expressBillDetailByDayDF.unpersist()
  customerTypeExpressBillTotalCountDF.unpersist()
  dotExpressBillTotalCountDF.unpersist()
  channelExpressBillTotalCountDF.unpersist()
  terminalExpressBillTotalCountDF.unpersist()
})

​​​​​​​通过StructType构建指定Schema

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//创建DataFrame:schema+rdd(数据)
//定义指标结果的schema信息
val schema: StructType = StructType(Array(
  StructField("id", StringType, false, Metadata.empty),
  StructField("totalExpressBillCount", LongType, false, Metadata.empty),
  StructField("maxTypeExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("minTypeExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("avgTypeExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("maxDotExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("minDotExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("avgDotExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("maxChannelExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("minChannelExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("avgChannelExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("maxTerminalExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("minTerminalExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("avgTerminalExpressBillTotalCount", LongType, false, Metadata.empty)
))

 ​​​​​​​持久化指标数据到kudu表​​​​​​​

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//将数据转换成rdd对象
val data: RDD[Row] = sparkSession.sparkContext.makeRDD(rows)

//根据表结构和数据创建DataFrame对象
val quotaDF: DataFrame = sparkSession.createDataFrame(data, schema)

//TODO 5)将计算好的指标数据写入到kudu数据库中
//将dataframe数据写入到kudu数据库
save(quotaDF, OfflineTableDefine.expressBillSummary)

​​​​​​​删除缓存数据

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//TODO 6)删除缓存数据
expressBillDetailGroupByDayDF.unpersist()
expressBillDetailDF.unpersist()

​​​​​​​完整代码

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package cn.it.logistics.offline.dws

import cn.it.logistics.common.{Configuration, DateHelper, OfflineTableDefine, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, LongType, Metadata, StringType, StructField, StructType}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel

import scala.collection.mutable.ArrayBuffer

/**
 * 快递单主题指标开发
 */
object ExpressBillDWS extends OfflineApp{
  //定义应用程序的名称
  val appName = this.getClass.getSimpleName

  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)创建SparkConf对象
     * 2)创建SparkSession对象
     * 3)读取快递明细宽表的数据
     * 4)对快递明细宽表的数据进行指标的计算
     * 5)将计算好的指标数据写入到kudu数据库中
     * 5.1:定义指标结果表的schema信息
     * 5.2:组织需要写入到kudu表的数据
     * 5.3:判断指标结果表是否存在,如果不存在则创建
     * 5.4:将数据写入到kudu表中
     * 6)删除缓存数据
     * 7)停止任务,退出sparksession
     */

    //TODO 1)创建SparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //TODO 2)创建SparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //处理数据
    execute(sparkSession)
  }

  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    //TODO 3)读取快递单明细宽表的数据
    val expressBillDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.expressBillDetail, Configuration.isFirstRunnable)

    //读取出来的明细宽表数据可能是增量的数据,也可能是全量的数据
    //全量的数据是包含多个日期的数据,增量数据是前一天的数据
    //需要计算的指标是以日为单位,每天的最大快递单数、最小快递单数、平均快递单数据
    //因此需要对读取出来的快递单明细宽表数据按照日为单位进行分组,然后统计每日的指标数据
    val expressBillDetailGroupByDayDF: DataFrame = expressBillDetailDF.select("day").groupBy("day").count().cache()

    //导入隐式转换
    import sparkSession.implicits._

    //定义计算好的指标结果集合对象
    val rows: ArrayBuffer[Row] = ArrayBuffer[Row]()

    //循环遍历所有日期的数据
    expressBillDetailGroupByDayDF.collect().foreach(row=>{
      //获取到需要处理的数据所在的日期
      val day: String = row.getAs[String](0)

      //根据日期查询该日期内的快递单明细数据,然后将查询到的结果进行指标计算(指定日期的指标)
      val expressBillDetailByDayDF: DataFrame = expressBillDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)

      //TODO 4)对快递明细宽表的数据进行指标的计算
      //总快递单数
      val totalExpressBillCount: Long = expressBillDetailByDayDF.agg(count("express_number")).first().getLong(0)
      //各类客户的快递单数
      val customerTypeExpressBillTotalCountDF: DataFrame = expressBillDetailByDayDF.groupBy($"type").agg(count("express_number").as("express_number")).cache()

      //各类客户最大快递单数
      val maxTypeExpressBillTotalCount: Row = customerTypeExpressBillTotalCountDF.agg(max("express_number")).first()

      //各类客户最小快递单数
      val minTypeExpressBillTotalCount: Row = customerTypeExpressBillTotalCountDF.agg(min("express_number")).first()

      //各类客户平均快递单数
      val avgTypeExpressBillTotalCount: Row = customerTypeExpressBillTotalCountDF.agg(avg("express_number")).first()

      //各网点的快递单数
      val dotExpressBillTotalCountDF: DataFrame = expressBillDetailByDayDF.groupBy($"dot_id").agg(count("express_number").as("express_number")).cache()

      //各网点最大快递单数
      val maxDotExpressBillTotalCount: Row = dotExpressBillTotalCountDF.agg(max("express_number")).first()

      //各网点最小快递单数
      val minDotExpressBillTotalCount: Row = dotExpressBillTotalCountDF.agg(min("express_number")).first()

      //各网点平均快递单数
      val avgDotExpressBillTotalCount: Row = dotExpressBillTotalCountDF.agg(avg("express_number")).first()

      //各渠道的快递单数
      val channelExpressBillTotalCountDF: DataFrame = expressBillDetailByDayDF.groupBy($"order_channel_id").agg(count("express_number").as("express_number")).cache()

      //各渠道最大快递单数
      val maxChannelExpressBillTotalCount: Row = channelExpressBillTotalCountDF.agg(max("express_number")).first()

      //各渠道最小快递单数
      val minChannelExpressBillTotalCount: Row = channelExpressBillTotalCountDF.agg(min("express_number")).first()

      //各渠道平均快递单数
      val avgChannelExpressBillTotalCount: Row = channelExpressBillTotalCountDF.agg(avg("express_number")).first()

      val terminalExpressBillTotalCountDF: DataFrame = expressBillDetailByDayDF.groupBy($"order_terminal_type").agg(count("express_number").as("express_number")).cache()
      println(terminalExpressBillTotalCountDF)

      //各终端最大快递单数
      val maxTerminalExpressBillTotalCount: Row = terminalExpressBillTotalCountDF.agg(max("express_number")).first()

      //各终端最小快递单数
      val minTerminalExpressBillTotalCount: Row = terminalExpressBillTotalCountDF.agg(min("express_number")).first()

      //各终端平均快递单数
      val avgTerminalExpressBillTotalCount: Row = terminalExpressBillTotalCountDF.agg(avg("express_number")).first()
      println(avgTerminalExpressBillTotalCount)

      //将每条记录写入到Row对象中
      val rowInfo = Row(
        day,
        totalExpressBillCount, //总快递单数
        if(maxTypeExpressBillTotalCount.isNullAt(0)) 0L else maxTypeExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(minTypeExpressBillTotalCount.isNullAt(0)) 0L else minTypeExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(avgTypeExpressBillTotalCount.isNullAt(0)) 0L else avgTypeExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(maxDotExpressBillTotalCount.isNullAt(0)) 0L else maxDotExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(minDotExpressBillTotalCount.isNullAt(0)) 0L else minDotExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(avgDotExpressBillTotalCount.isNullAt(0)) 0L else avgDotExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(maxChannelExpressBillTotalCount.isNullAt(0)) 0L else maxChannelExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(minChannelExpressBillTotalCount.isNullAt(0)) 0L else minChannelExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(avgChannelExpressBillTotalCount.isNullAt(0)) 0L else avgChannelExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(maxTerminalExpressBillTotalCount.isNullAt(0)) 0L else maxTerminalExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(minTerminalExpressBillTotalCount.isNullAt(0)) 0L else minTerminalExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(avgTerminalExpressBillTotalCount.isNullAt(0)) 0L else avgTerminalExpressBillTotalCount.get(0).asInstanceOf[Number].longValue()
      )

      println(rowInfo)

      //将计算好的结果数据写入到结果对象中
      rows.append(rowInfo)

      //释放资源
      expressBillDetailByDayDF.unpersist()
      customerTypeExpressBillTotalCountDF.unpersist()
      dotExpressBillTotalCountDF.unpersist()
      channelExpressBillTotalCountDF.unpersist()
      terminalExpressBillTotalCountDF.unpersist()
    })

    //创建DataFrame:schema+rdd(数据)
    //定义指标结果的schema信息
    val schema: StructType = StructType(Array(
      StructField("id", StringType, false, Metadata.empty),
      StructField("totalExpressBillCount", LongType, false, Metadata.empty),
      StructField("maxTypeExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("minTypeExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("avgTypeExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("maxDotExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("minDotExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("avgDotExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("maxChannelExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("minChannelExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("avgChannelExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("maxTerminalExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("minTerminalExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("avgTerminalExpressBillTotalCount", LongType, false, Metadata.empty)
    ))

    //将数据转换成rdd对象
    val data: RDD[Row] = sparkSession.sparkContext.makeRDD(rows)

    //根据表结构和数据创建DataFrame对象
    val quotaDF: DataFrame = sparkSession.createDataFrame(data, schema)

    //TODO 5)将计算好的指标数据写入到kudu数据库中
    //将dataframe数据写入到kudu数据库
    save(quotaDF, OfflineTableDefine.expressBillSummary)

    //TODO 6)删除缓存数据
    expressBillDetailGroupByDayDF.unpersist()
    expressBillDetailDF.unpersist()

    //TODO 7)停止任务,退出sparksession
    sparkSession.stop()
  }
}

​​​​​​​3、测试验证

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022/03/13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
客快物流大数据项目(六十四):运单主题
“运单是运输合同的证明,是承运人已经接收货物的收据。一份运单,填写一个托运人、收货人、起运港、到达港。如同一托运人的货物分别属到达港的两个或两个以上收货人,则应分别填制运单。”
Lansonli
2022/03/13
8810
客快物流大数据项目(六十四):运单主题
客快物流大数据项目(六十五):仓库主题
从2005年开始,网购快递每年以倍增的速度增长。重大节日前是快递爆仓发生的时段。如五一节前夕、国庆节前夕、圣诞节前夕、元旦前夕、春节前夕。新兴的光棍节,网购日,2010年“光棍节、圣诞节、元旦”和春节前夕,淘宝网、京东商城等网商集中促销造成部分民营快递企业多次发生爆仓现象。如2011年11月11日世纪光棍节,淘宝网当天交易额33亿,包裹堆积成山,快递公司原有的交通工具和人员,远远无法满足运送这么多包裹的要求,因此造成包裹被堆积在仓库长达十几天。广州市甚至出现同城快件10天不到的情况。
Lansonli
2022/05/12
7830
客快物流大数据项目(六十五):仓库主题
客快物流大数据项目(六十七):客户主题
客户明细宽表数据需要保存到kudu中,因此在第一次执行客户明细拉宽操作时,客户明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建
Lansonli
2022/05/16
6950
客快物流大数据项目(六十七):客户主题
客快物流大数据项目(六十六):车辆主题
车辆主题主要是统计各个网点、区域、公司的发车情况,反映了区域或者公司的吞吐量及运营状况。
Lansonli
2022/05/16
6700
客快物流大数据项目(六十六):车辆主题
客快物流大数据项目(六十二):主题及指标开发
每个主题都需要拉宽操作将拉宽后的数据存储到kudu表中,同时指标计算的数据最终也需要落地到kudu表,因此提前将各个主题相关表名定义出来
Lansonli
2022/03/12
8160
客快物流大数据项目(六十二):主题及指标开发
客快物流大数据项目(一百零二):业务和指数开发
业务和指数开发一、​​​​​​​业务开发实现步骤:在logistics-etl模块cn.it.logistics.etl.realtime程序包下创建CKStreamApp单例对象,继承自StreamApp编写main入口函数,初始化spark的运行环境实现StreamApp基类的两个方法 Execute(消费kafka数据,并对消费到的数据转换成对象,过滤每张表的数据写入到CK)Save(实现数据写入到ClickHouse中)实现方法:在logistics-etl模块cn.it.logistics.
Lansonli
2022/12/29
6490
客快物流大数据项目(一百零二):业务和指数开发
客快物流大数据项目(六十):将消费的kafka数据转换成bean对象
目录 将消费的kafka数据转换成bean对象 一、将OGG数据转换成bean对象 二、​​​​​​​将Canal数据转换成bean对象 三、完整代码 将消费的kafka数据转换成bean对象 一、​​​​​​​将OGG数据转换成bean对象 实现步骤: 消费kafka的 logistics Topic数据 将消费到的数据转换成OggMessageBean对象 递交作业启动运行 实现过程: 消费kafka的 logistics Topic数据 //2.1:获取物流系统相关的数据 val logistics
Lansonli
2022/03/09
5070
客快物流大数据项目(六十一):将消费的kafka数据同步到Kudu中
已经将kafka消费的数据转换成了OggMessageBean或者CanalMessageBean对象,每条数据对应着某张表的操作记录。
Lansonli
2022/03/10
2.5K0
客快物流大数据项目(六十一):将消费的kafka数据同步到Kudu中
客快物流大数据项目(一百):ClickHouse的使用
​ClickHouse的使用一、使用Java操作ClickHouse1、构建maven工程2、​​​​​​​导入依赖<!-- Clickhouse --><dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.2</version></dependency>3、​​​​​​​​​​​​​​创建包结构在java程序包目录创建包名说明c
Lansonli
2022/12/29
1.3K0
客快物流大数据项目(一百):ClickHouse的使用
客快物流大数据项目(四十五):Spark操作Kudu DML操作
Kudu支持许多DML类型的操作,其中一些操作包含在Spark on Kudu集成. 包括:
Lansonli
2022/02/24
7040
客快物流大数据项目(四十五):Spark操作Kudu DML操作
客快物流大数据项目(一百零一):实时OLAP开发
使用ClickHouse分析物流指标数据,必须将数据存储到ClickHouse中。
Lansonli
2022/12/29
1.3K0
客快物流大数据项目(一百零一):实时OLAP开发
客快物流大数据项目(四十四):Spark操作Kudu创建表
定义表时要注意的是Kudu表选项值。你会注意到在指定组成范围分区列的列名列表时我们调用“asJava”方 法。这是因为在这里,我们调用了Kudu Java客户端本身,它需要Java对象(即java.util.List)而不是Scala的List对 象;(要使“asJava”方法可用,请记住导入JavaConverters库。) 创建表后,通过将浏览器指向http//master主机名:8051/tables
Lansonli
2022/02/24
5840
客快物流大数据项目(四十四):Spark操作Kudu创建表
客快物流大数据项目(五十一):数据库表分析
​​​​​​​7、客户寄件信息表(tbl_consumer_sender_info)
Lansonli
2022/02/28
1.2K0
客快物流大数据项目(五十一):数据库表分析
客快物流大数据项目(五十七):创建Kudu-ETL流式计算程序
创建Kudu-ETL流式计算程序 实现步骤: 在realtime目录创建 KuduStreamApp 单例对象,继承自 StreamApp 特质 重写特质内的方法 编写代码接入kafka集群消费其数据 package cn.it.logistics.etl.realtime import cn.itcast.logistics.common.{Configuration, SparkUtils} import org.apache.spark.SparkConf import org.apache.spa
Lansonli
2022/03/07
4100
对于一般大数据物流项目的面试题(问题+答案)
1、数据采集如何完成 OGG 不要涉及,Oracle DBA完成 Canal数据采集,一定知道高可用HA集群模式 2、数据量大小 Kafka topic 数据存储生命周期(多久) 7天
Maynor
2021/12/07
3740
对于一般大数据物流项目的面试题(问题+答案)
大数据Kudu(九):Spark操作Kudu
使用SparkSQL操作Kudu,这里需要导入Kudu与SparkSQL整合的包和SparkSQL的包,在Maven中导入如下依赖:
Lansonli
2022/12/21
1.2K0
大数据Kudu(九):Spark操作Kudu
Spark SQL | Spark,从入门到精通
欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你。
美图数据技术团队
2019/04/19
2.1K0
Spark SQL | Spark,从入门到精通
Spark SQL 数据统计 Scala 开发小结
本文介绍了如何在 Spark 中使用 DataFrame 和 Dataset 进行数据操作,包括数据读取、数据转换、数据聚合、数据排序和数据分组等操作。同时,还介绍了如何使用 Spark Streaming 进行实时数据处理,以及如何使用 Spark SQL 进行 SQL 查询。
李德鑫
2017/08/15
9.7K3
Spark SQL从入门到精通
熟悉spark sql的都知道,spark sql是从shark发展而来。Shark为了实现Hive兼容,在HQL方面重用了Hive中HQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MR作业替换成了Spark作业(辅以内存列式存储等各种和Hive关系不大的优化);
Spark学习技巧
2019/05/09
1.2K0
Spark SQL从入门到精通
客快物流大数据项目(五十五):封装公共接口(根据存储介质抽取特质)
Structured Streaming 流处理程序消费kafka数据以后,会将数据分别存储到Kudu、ES、ClickHouse中,因此可以根据存储介质不同,封装其公共接口,每个流处理程序继承自该接口
Lansonli
2022/03/04
2760
推荐阅读
相关推荐
客快物流大数据项目(六十四):运单主题
更多 >
加入讨论
的问答专区 >
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
    本文部分代码块支持一键运行,欢迎体验
    本文部分代码块支持一键运行,欢迎体验