前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >客快物流大数据项目(六十三):快递单主题

客快物流大数据项目(六十三):快递单主题

作者头像
Lansonli
发布于 2022-03-13 04:01:33
发布于 2022-03-13 04:01:33
79500
代码可运行
举报
文章被收录于专栏:Lansonli技术博客Lansonli技术博客
运行总次数:0
代码可运行

目录

快递单

一、背景介绍

二、指标明细

三、表关联关系

1、事实表

2、 维度表

3、关联关系

四、快递单数据拉宽开发 

1、拉宽后的字段

2、SQL语句

3、Spark实现

4、​​​​​​​测试验证

五、​​​​​​​快递单数据指标计算开发

1、​​​​​​​计算的字段

2、Spark实现

​​​​​​​3、测试验证

快递单主题

一、背景介绍

快递单量的统计主要是从多个不同的维度计算快递单量,从而监测快递公司业务运营情况。

二、指标明细

指标列表

维度

快递单数

总快递单数

最大快递单数

各类客户最大快递单数

各渠道最大快递单数

各网点最大快递单数

各终端最大快递单数

最小快递单数

各类客户最小快递单数

各渠道最小快递单数

各网点最小快递单数

各终端最小快递单数

平均快递单数

各类客户平均快递单数

各渠道平均快递单数

各网点平均快递单数

各终端平均快递单数

三、表关联关系

1、事实表

表名

描述

tbl_express_bill

快递单据表

2、 维度表

表名

描述

tbl_consumer

客户表

tbl_courier

快递员表

tbl_pkg

包裹表

tbl_areas

区域表

tbl_dot

网点表

tbl_company_dot_map

公司网点关联表

tbl_company

公司表

tbl_consumer_address_map

客户地址关联表

tbl_address

客户地址表

tbl_codes

字典表

3、关联关系

快递单表与维度表的关联关系如下:

四、快递单数据拉宽开发 

1、拉宽后的字段

字段名

别名

字段描述

tbl_express_bill

id

id

快递单id

tbl_express_bill

expressNumber

express_number

快递单编号

tbl_express_bill

cid

cid

客户ID

tbl_customer

name

cname

客户名字

tbl_address

detailAddr

caddress

详细地址

tbl_express_bill

eid

eid

员工ID

tbl_courier

name

ename

快递员姓名

tbl_dot

id

dot_id

网点ID

tbl_dot

dotName

dot_name

网点名称

tbl_company

companyName

company_name

公司名字

tbl_express_bill

orderChannelId

order_channel_id

下单渠道ID

tbl_codes

channelTypeName

order_channel_name

下单渠道名称

tbl_express_bill

orderDt

order_dt

下单时间

tbl_express_bill

orderTerminalType

order_terminal_type

下单设备类型ID

tbl_codes

orderTypeName

order_terminal_type_name

下单设备类型名称

tbl_express_bill

orderTerminalOsType

order_terminal_os_type

下单设备操作系统ID

tbl_express_bill

reserveDt

reserve_dt

预约取件时间

tbl_express_bill

isCollectPackageTimeout

is_collect_package_timeout

是否取件超时

tbl_express_bill

timeoutDt

timeout_dt

超时时间

tbl_customer

type

ctype

客户类别id

tbl_codes

code_desc

ctype_name

客户类别名称

tbl_express_bill

cdt

cdt

创建时间

tbl_express_bill

udt

udt

修改时间

tbl_express_bill

remark

remark

备注

tbl_express_bill

yyyyMMdd(cdt)

day

创建时间 年月日格式

2、SQL语句

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
SELECT EBILL."id" ,
EBILL ."express_number" ,
EBILL ."cid" ,
CUSTOMER."name" AS cname,
ADDRESS ."detail_addr" AS caddress,
EBILL."eid" ,
COURIER ."name" AS ename,
DOT ."id" AS dot_id,
DOT ."dot_name" AS dot_name,
COMPANY ."company_name" AS company_name,
EBILL."order_channel_id" ,
code1."name" AS "order_channel_name",
ebill."order_dt",
ebill."order_terminal_type",
code2."name" AS order_terminal_type_name,
ebill."order_terminal_os_type" ,
ebill."reserve_dt" ,
ebill."is_collect_package_timeout" ,
ebill."timeout_dt" ,
CUSTOMER."type" ,
ebill."cdt" ,
ebill."udt" ,
ebill."remark" 
FROM "tbl_express_bill" EBILL
LEFT JOIN "tbl_courier" courier ON EBILL."eid" = courier."id" 
LEFT JOIN "tbl_customer" customer ON ebill."cid" = CUSTOMER ."id" 
LEFT JOIN "tbl_codes" code1 ON code1."type" =18 AND ebill."order_channel_id" =code1."code"  
LEFT JOIN "tbl_codes" code2 ON code2."type" =17 AND ebill."order_terminal_type" =code2."code"  
LEFT JOIN "tbl_consumer_address_map" address_map ON CUSTOMER."id" = address_map."consumer_id"
LEFT JOIN "tbl_address" address ON address_map."address_id" = ADDRESS."id"
LEFT JOIN "tbl_pkg" pkg ON EBILL."express_number" = pkg."pw_bill"
LEFT JOIN "tbl_dot" dot ON PKG ."pw_dot_id"=dot."id" 
LEFT JOIN "tbl_company_dot_map" companydot ON dot."id" =COMPANYDOT ."dot_id" 
LEFT JOIN "tbl_company" company ON COMPANY ."id"=companydot."company_id" 

3、​​​​​​​Spark实现

实现步骤:

  • dwd目录下创建 ExpressBillDWD 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 获取快递单表(tbl_express_bill)数据,并缓存数据(缓存两份数据,生产环境中肯定是多台服务器,为了避免数据丢失缓存2份数据,测试环境只有一台服务器,按照生产环境进行开发)
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 获取客户表(tbl_customer)数据,并缓存数据
  • 获取快递员表(tbl_courier)数据,并缓存数据
  • 获取包裹表(tbl_pkg)数据,并缓存数据
  • 获取网点表(tbl_dot)数据,并缓存数据
  • 获取区域表(tbl_areas)数据,并缓存数据
  • 获取公司网点关联表(tbl_company_dot_map)数据,并缓存数据
  • 获取公司表(tbl_company)数据,并缓存数据
  • 获取客户地址关联表(tbl_consumer_address_map)数据,并缓存数据
  • 获取客户地址表(tbl_address)数据,并缓存数据
  • 获取字典表(tbl_codes)数据,并缓存数据
  • 根据以下方式拉宽快递单明细数据
    • 根据客户id,在客户表中获取客户数据
    • 根据快递员id,在快递员表中获取快递员数据
    • 根据客户id,在客户地址表中获取客户地址数据
    • 根据快递单号,在包裹表中获取包裹数据
    • 根据包裹的发货网点id,获取到网点数据
    • 根据网点id, 获取到公司数据
    • 创建快递单明细宽表(若存在则不创建)
  • 将快递单明细宽表数据写入到kudu数据表中
  • 删除缓存数据

​​​​​​​​​​​​​​初始化环境变量

  • 初始化快递单明细拉宽作业的环境变量
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package cn.it.logistics.offline.dwd

import cn.it.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType

/**
 * 快递单主题开发
 * 将快递单事实表的数据与相关维度表的数据进行关联,然后将拉宽后的数据写入到快递单宽表中
 * 采用DSL语义实现离线计算程序
 *
 * 最终离线程序需要部署到服务器,每天定时执行(azkaban定时调度)
 */
object ExpressBillDWD extends OfflineApp {
  //定义应用的名称
  val appName = this.getClass.getSimpleName

/**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)初始化sparkConf对象
     * 2)创建sparkSession对象
     * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
     * 4)定义维度表与事实表的关联
     * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
     * 5.1:创建快递单明细宽表的schema表结构
     * 5.2:创建快递单宽表(判断宽表是否存在,如果不存在则创建)
     * 5.3:将数据写入到kudu中
     * 6):将缓存的数据删除掉
     */

    //1)初始化sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //2)创建sparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //数据处理
    execute(sparkSession)
  }

  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    sparkSession.stop()
  }
}

​​​​​​​加载快递单相关的表数据并缓存

  • 加载快递单表的时候,需要指定日期条件,因为快递单主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//3.1:加载快递单事实表的数据
val expressBillDF: DataFrame = getKuduSource(sparkSession, TableMapping.expressBill, Configuration.isFirstRunnable)
  .persist(StorageLevel.DISK_ONLY_2) //将数据缓存两个节点的磁盘目录,避免单机故障导致的缓存数据丢失

//3.2:加载快递员维度表的数据
val courierDF: DataFrame = getKuduSource(sparkSession, TableMapping.courier, true).persist(StorageLevel.DISK_ONLY_2)

//3.2:加载客户维度表的数据
val customerDF: DataFrame = getKuduSource(sparkSession, TableMapping.customer, true).persist(StorageLevel.DISK_ONLY_2)

//3.4:加载物流码表的数据
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)

//3.5:客户地址关联表的数据
val addressMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.consumerAddressMap, true).persist(StorageLevel.DISK_ONLY_2)

//3.6:加载地址表的数据
val addressDF: DataFrame = getKuduSource(sparkSession, TableMapping.address, true).persist(StorageLevel.DISK_ONLY_2)

//3.7:加载包裹表的数据
val pkgDF: DataFrame = getKuduSource(sparkSession, TableMapping.pkg, true).persist(StorageLevel.DISK_ONLY_2)

//3.8:加载网点表的数据
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)

//3.9:加载公司网点表的数据
val companyDotMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)

//3.10:加载公司表的数据
val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)

//导入隐式转换
import  sparkSession.implicits._

//获取终端类型码表数据
val orderTerminalTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.OrderTerminalType)
  .select($"code".as("OrderTerminalTypeCode"), $"codeDesc".as("OrderTerminalTypeName"))

//获取下单终端类型码表数据
val orderChannelTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.OrderChannelType)
  .select($"code".as("OrderChannelTypeCode"), $"codeDesc".as("OrderChannelTypeName"))

​​​​​​​定义表的关联关系

  • 为了在DWS层任务中方便的获取每日增量快递单数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd

代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//TODO 4)定义维度表与事实表的关联
val joinType = "left_outer"
val expressBillDetailDF: DataFrame = expressBillDF
  .join(courierDF, expressBillDF("eid") === courierDF("id") ,joinType) //快递单表与快递员表进行关联
  .join(customerDF, expressBillDF("cid")  === customerDF("id"), joinType) //快递单表与客户表进行关联
  .join(orderChannelTypeDF, orderChannelTypeDF("OrderChannelTypeCode") === expressBillDF("orderChannelId"), joinType) //下单渠道表与快递单表关联
  .join(orderTerminalTypeDF, orderTerminalTypeDF("OrderTerminalTypeCode") === expressBillDF("orderTerminalType"), joinType) //终端类型表与快递单表关联
  .join(addressMapDF, addressMapDF("consumerId") === customerDF("id"), joinType) //客户地址关联表与客户表关联
  .join(addressDF, addressDF("id") === addressMapDF("addressId"), joinType)  //地址表与客户地址关联表关联
  .join(pkgDF, pkgDF("pwBill") === expressBillDF("expressNumber"), joinType) //包裹表与快递单表关联
  .join(dotDF, dotDF("id") === pkgDF("pwDotId"), joinType)  //网点表与包裹表关联
  .join(companyDotMapDF, companyDotMapDF("dotId") === dotDF("id"), joinType) //公司网点关联表与网点表关联
  .join(companyDF, companyDF("id") === companyDotMapDF("companyId"), joinType) //公司网点关联表与公司表关联
  .withColumn("day", date_format(expressBillDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
  .sort(expressBillDF.col("cdt").asc) //根据快递单的创建时间顺序排序
  .select(
    expressBillDF("id"),        //快递单id
    expressBillDF("expressNumber").as("express_number"),  //快递单编号
    expressBillDF("cid"),     //客户id
    customerDF("name").as("cname"), //客户名称
    addressDF("detailAddr").as("caddress"),      //客户地址
    expressBillDF("eid"), //员工id
    courierDF("name").as("ename"), //员工名称
    dotDF("id").as("dot_id"), //网点id
    dotDF("dotName").as("dot_name"), //网点名称
    companyDF("companyName").as("company_name"),//公司名称
    expressBillDF("orderChannelId").as("order_channel_id"), //下单渠道id
    orderChannelTypeDF("OrderChannelTypeName").as("order_channel_name"), //下单渠道id
    expressBillDF("orderDt").as("order_dt"),//下单时间
    orderTerminalTypeDF("OrderTerminalTypeCode").as("order_terminal_type"), //下单设备类型id
    orderTerminalTypeDF("OrderTerminalTypeName").as("order_terminal_type_name"), //下单设备类型id
    expressBillDF("orderTerminalOsType").as("order_terminal_os_type"),//下单设备操作系统
    expressBillDF("reserveDt").as("reserve_dt"),//预约取件时间
    expressBillDF("isCollectPackageTimeout").as("is_collect_package_timeout"),//是否取件超时
    expressBillDF("timeoutDt").as("timeout_dt"),//超时时间
    customerDF("type"),//客户类型
    expressBillDF("cdt"),//创建时间
    expressBillDF("udt"),//修改时间
    expressBillDF("remark"),//备注
    $"day"
  )

​​​​​​​创建快递单明细宽表并将快递单明细数据写入到宽表中

快递单宽表数据需要保存到kudu中,因此在第一次执行快递单明细拉宽操作时,快递单明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建

实现步骤:

  • ExpressBillDWD 单例对象中调用父类save方法
    • 判断宽表是否存在,如果不存在则创建宽表
    • 将明细数据写入到宽表中

参考代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
save(expressBillDetailDF, OfflineTableDefine.expressBillDetail)

​​​​​​​删除缓存数据

为了释放资源,快递单明细宽表数据计算完成以后,需要将缓存的源表数据删除。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//TODO 6) 将缓存的数据删除掉
expressBillDF.unpersist()
courierDF.unpersist()
customerDF.unpersist()
orderChannelTypeDF.unpersist()
orderTerminalTypeDF.unpersist()
addressMapDF.unpersist()
addressDF.unpersist()
pkgDF.unpersist()
dotDF.unpersist()
companyDotMapDF.unpersist()
companyDF.unpersist()

​​​​​​​完整代码

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package cn.it.logistics.offline.dwd

import cn.it.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType

/**
 * 快递单主题开发
 * 将快递单事实表的数据与相关维度表的数据进行关联,然后将拉宽后的数据写入到快递单宽表中
 * 采用DSL语义实现离线计算程序
 *
 * 最终离线程序需要部署到服务器,每天定时执行(azkaban定时调度)
 */
object ExpressBillDWD extends OfflineApp {
  //定义应用的名称
  val appName = this.getClass.getSimpleName

/**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)初始化sparkConf对象
     * 2)创建sparkSession对象
     * 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
     * 4)定义维度表与事实表的关联
     * 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
     * 5.1:创建快递单明细宽表的schema表结构
     * 5.2:创建快递单宽表(判断宽表是否存在,如果不存在则创建)
     * 5.3:将数据写入到kudu中
     * 6):将缓存的数据删除掉
     */

    //1)初始化sparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //2)创建sparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //数据处理
    execute(sparkSession)
  }

  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    //TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
    //3.1:加载快递单事实表的数据
    val expressBillDF: DataFrame = getKuduSource(sparkSession, "tbl_express_bill", Configuration.isFirstRunnable)
      .persist(StorageLevel.DISK_ONLY_2) //将数据缓存两个节点的磁盘目录,避免单机故障导致的缓存数据丢失

    //3.2:加载快递员维度表的数据
    val courierDF: DataFrame = getKuduSource(sparkSession, "tbl_courier", true).persist(StorageLevel.DISK_ONLY_2)

    //3.2:加载客户维度表的数据
    val customerDF: DataFrame = getKuduSource(sparkSession, "tbl_customer", true).persist(StorageLevel.DISK_ONLY_2)

    //3.4:加载物流码表的数据
    val codesDF: DataFrame = getKuduSource(sparkSession, "tbl_codes", true).persist(StorageLevel.DISK_ONLY_2)

    //3.5:客户地址关联表的数据
    val addressMapDF: DataFrame = getKuduSource(sparkSession, "tbl_consumer_address_map", true).persist(StorageLevel.DISK_ONLY_2)

    //3.6:加载地址表的数据
    val addressDF: DataFrame = getKuduSource(sparkSession, "tbl_address", true).persist(StorageLevel.DISK_ONLY_2)

    //3.7:加载包裹表的数据
    val pkgDF: DataFrame = getKuduSource(sparkSession, "tbl_pkg", true).persist(StorageLevel.DISK_ONLY_2)

    //3.8:加载网点表的数据
    val dotDF: DataFrame = getKuduSource(sparkSession, "tbl_dot", true).persist(StorageLevel.DISK_ONLY_2)

    //3.9:加载公司网点表的数据
    val companyDotMapDF: DataFrame = getKuduSource(sparkSession, "tbl_company_dot_map", true).persist(StorageLevel.DISK_ONLY_2)

    //3.10:加载公司表的数据
    val companyDF: DataFrame = getKuduSource(sparkSession, "tbl_company", true).persist(StorageLevel.DISK_ONLY_2)

    //导入隐式转换
    import  sparkSession.implicits._

    //获取终端类型码表数据
    val orderTerminalTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.OrderTerminalType)
      .select($"code".as("OrderTerminalTypeCode"), $"codeDesc".as("OrderTerminalTypeName"))

    //获取下单终端类型码表数据
    val orderChannelTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.OrderChannelType)
      .select($"code".as("OrderChannelTypeCode"), $"codeDesc".as("OrderChannelTypeName"))

    //TODO 4)定义维度表与事实表的关联
    val joinType = "left_outer"
    val expressBillDetailDF: DataFrame = expressBillDF
      .join(courierDF, expressBillDF("eid") === courierDF("id") ,joinType) //快递单表与快递员表进行关联
      .join(customerDF, expressBillDF("cid")  === customerDF("id"), joinType) //快递单表与客户表进行关联
      .join(orderChannelTypeDF, orderChannelTypeDF("OrderChannelTypeCode") === expressBillDF("orderChannelId"), joinType) //下单渠道表与快递单表关联
      .join(orderTerminalTypeDF, orderTerminalTypeDF("OrderTerminalTypeCode") === expressBillDF("orderTerminalType"), joinType) //终端类型表与快递单表关联
      .join(addressMapDF, addressMapDF("consumerId") === customerDF("id"), joinType) //客户地址关联表与客户表关联
      .join(addressDF, addressDF("id") === addressMapDF("addressId"), joinType)  //地址表与客户地址关联表关联
      .join(pkgDF, pkgDF("pwBill") === expressBillDF("expressNumber"), joinType) //包裹表与快递单表关联
      .join(dotDF, dotDF("id") === pkgDF("pwDotId"), joinType)  //网点表与包裹表关联
      .join(companyDotMapDF, companyDotMapDF("dotId") === dotDF("id"), joinType) //公司网点关联表与网点表关联
      .join(companyDF, companyDF("id") === companyDotMapDF("companyId"), joinType) //公司网点关联表与公司表关联
      .withColumn("day", date_format(expressBillDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
      .sort(expressBillDF.col("cdt").asc) //根据快递单的创建时间顺序排序
      .select(
        expressBillDF("id"),        //快递单id
        expressBillDF("expressNumber").as("express_number"),  //快递单编号
        expressBillDF("cid"),     //客户id
        customerDF("name").as("cname"), //客户名称
        addressDF("detailAddr").as("caddress"),      //客户地址
        expressBillDF("eid"), //员工id
        courierDF("name").as("ename"), //员工名称
        dotDF("id").as("dot_id"), //网点id
        dotDF("dotName").as("dot_name"), //网点名称
        companyDF("companyName").as("company_name"),//公司名称
        expressBillDF("orderChannelId").as("order_channel_id"), //下单渠道id
        orderChannelTypeDF("OrderChannelTypeName").as("order_channel_name"), //下单渠道id
        expressBillDF("orderDt").as("order_dt"),//下单时间
        orderTerminalTypeDF("OrderTerminalTypeCode").as("order_terminal_type"), //下单设备类型id
        orderTerminalTypeDF("OrderTerminalTypeName").as("order_terminal_type_name"), //下单设备类型id
        expressBillDF("orderTerminalOsType").as("order_terminal_os_type"),//下单设备操作系统
        expressBillDF("reserveDt").as("reserve_dt"),//预约取件时间
        expressBillDF("isCollectPackageTimeout").as("is_collect_package_timeout"),//是否取件超时
        expressBillDF("timeoutDt").as("timeout_dt"),//超时时间
        customerDF("type"),//客户类型
        expressBillDF("cdt"),//创建时间
        expressBillDF("udt"),//修改时间
        expressBillDF("remark"),//备注
        $"day"
      )

    //TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
    save(expressBillDetailDF, OfflineTableDefine.expressBillDetail)

    //TODO 6) 将缓存的数据删除掉
    expressBillDF.unpersist()
    courierDF.unpersist()
    customerDF.unpersist()
    orderChannelTypeDF.unpersist()
    orderTerminalTypeDF.unpersist()
    addressMapDF.unpersist()
    addressDF.unpersist()
    pkgDF.unpersist()
    dotDF.unpersist()
    companyDotMapDF.unpersist()
    companyDF.unpersist()

    sparkSession.stop()
  }
}

4、​​​​​​​测试验证

实现步骤:

  • ExpressBillDWD 单例对象中读取快递单明细宽表的数据
  • 输出展示

实现过程:

  • ExpressBillDWD 单例对象中读取快递单明细宽表的数据
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// 检查今日数据
spark.read
  .format(Configure.SPARK_KUDU_FORMAT)
  .options(Map("kudu.master" -> Configure.kuduRpcAddress, "kudu.table" -> table))
  .load
  .show
  • 输出展示

五、​​​​​​​快递单数据指标计算开发

1、​​​​​​​计算的字段

字段名

字段描述

id

数据产生时间

totalExpressBillCount

总快递单数

maxTypeExpressBillTotalCount

各类客户最大快递单数

minTypeExpressBillTotalCount

各类客户最小快递单数

avgTypeExpressBillTotalCount

各类客户平均快递单数

maxDotExpressBillTotalCount

各网点最大快递单数

minDotExpressBillTotalCount

各网点最小快递单数

avgDotExpressBillTotalCount

各网点平均快递单数

maxChannelExpressBillTotalCount

各渠道最大快递单数

minChannelExpressBillTotalCount

各渠道最小快递单数

avgChannelExpressBillTotalCount

各渠道平均快递单数

maxTerminalExpressBillTotalCount

各终端最大快递单数

minTerminalExpressBillTotalCount

各终端最小快递单数

avgTerminalExpressBillTotalCount

各终端平均快递单数

2、Spark实现

实现步骤:

  • 在dws目录下创建 ExpressBillDWS 单例对象,继承自OfflineApp特质
  • 初始化环境的参数,创建SparkSession对象
  • 根据指定的日期获取拉宽后的快递单宽表(tbl_express_bill_detail)增量数据,并缓存数据
  • 判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)
  • 指标计算
    • 计算总快递单数
    • 各类客户快递单数
    • 各类客户最大快递单数
    • 各类客户最小快递单数
    • 各类客户平均快递单数
    • 各网点快递单数
    • 各网点最大快递单数
    • 各网点最小快递单数
    • 各网点平均快递单数
    • 各渠道快递单数
    • 各渠道最大快递单数
    • 各渠道最小快递单数
    • 各渠道平均快递单数
    • 各终端快递单数
    • 各终端最大快递单数
    • 各终端最小快递单数
    • 各终端平均快递单数
    • 获取当前时间yyyyMMddHH
  • 构建要持久化的指标数据(需要判断计算的指标是否有值,若没有需要赋值默认值
  • 通过StructType构建指定Schema
  • 创建快递单指标数据表(若存在则不创建)
  • 持久化指标数据到kudu表

​​​​​​​初始化环境变量

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package cn.it.logistics.offline.dws

import cn.it.logistics.common.{Configuration, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object ExpressBillDWS  extends OfflineApp{
  //定义应用程序的名称
  val appName = this.getClass.getSimpleName

  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)创建SparkConf对象
     * 2)创建SparkSession对象
     * 3)读取快递明细宽表的数据
     * 4)对快递明细宽表的数据进行指标的计算
     * 5)将计算好的指标数据写入到kudu数据库中
     * 5.1:定义指标结果表的schema信息
     * 5.2:组织需要写入到kudu表的数据
     * 5.3:判断指标结果表是否存在,如果不存在则创建
     * 5.4:将数据写入到kudu表中
     * 6)删除缓存数据
     * 7)停止任务,退出sparksession
     */

    //TODO 1)创建SparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //TODO 2)创建SparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //处理数据
    execute(sparkSession)
  }

  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    // 退出sc
    sparkSession.stop
  }
}

​​​​​​​加载快递单宽表增量数据并缓存

加载快递单宽表的时候,需要指定日期条件,因为快递单主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//TODO 3)读取快递单明细宽表的数据
val expressBillDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.expressBillDetail, Configuration.isFirstRunnable)

 ​​​​​​​指标计算

程序首次运行需要全量装载历史的快递单数据,离线计算程序每天计算昨天增量数据,因此需要将历史的数据进行按照天进行分组,然后根据某一天来进行统计当前日期下的快递单相关指标数据

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//读取出来的明细宽表数据可能是增量的数据,也可能是全量的数据
//全量的数据是包含多个日期的数据,增量数据是前一天的数据
//需要计算的指标是以日为单位,每天的最大快递单数、最小快递单数、平均快递单数据
//因此需要对读取出来的快递单明细宽表数据按照日为单位进行分组,然后统计每日的指标数据
val expressBillDetailGroupByDayDF: DataFrame = expressBillDetailDF.select("day").groupBy("day").count().cache()

//导入隐式转换
import sparkSession.implicits._

//定义计算好的指标结果集合对象
val rows: ArrayBuffer[Row] = ArrayBuffer[Row]()

//循环遍历所有日期的数据
expressBillDetailGroupByDayDF.collect().foreach(row=>{
  //获取到需要处理的数据所在的日期
  val day: String = row.getAs[String](0)

  //根据日期查询该日期内的快递单明细数据,然后将查询到的结果进行指标计算(指定日期的指标)
  val expressBillDetailByDayDF: DataFrame = expressBillDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)

  //TODO 4)对快递明细宽表的数据进行指标的计算
  //总快递单数
  val totalExpressBillCount: Long = expressBillDetailByDayDF.agg(count("express_number")).first().getLong(0)
  //各类客户的快递单数
  val customerTypeExpressBillTotalCountDF: DataFrame = expressBillDetailByDayDF.groupBy($"type").agg(count("express_number").as("express_number")).cache()

  //各类客户最大快递单数
  val maxTypeExpressBillTotalCount: Row = customerTypeExpressBillTotalCountDF.agg(max("express_number")).first()

  //各类客户最小快递单数
  val minTypeExpressBillTotalCount: Row = customerTypeExpressBillTotalCountDF.agg(min("express_number")).first()

  //各类客户平均快递单数
  val avgTypeExpressBillTotalCount: Row = customerTypeExpressBillTotalCountDF.agg(avg("express_number")).first()

  //各网点的快递单数
  val dotExpressBillTotalCountDF: DataFrame = expressBillDetailByDayDF.groupBy($"dot_id").agg(count("express_number").as("express_number")).cache()

  //各网点最大快递单数
  val maxDotExpressBillTotalCount: Row = dotExpressBillTotalCountDF.agg(max("express_number")).first()

  //各网点最小快递单数
  val minDotExpressBillTotalCount: Row = dotExpressBillTotalCountDF.agg(min("express_number")).first()

  //各网点平均快递单数
  val avgDotExpressBillTotalCount: Row = dotExpressBillTotalCountDF.agg(avg("express_number")).first()

  //各渠道的快递单数
  val channelExpressBillTotalCountDF: DataFrame = expressBillDetailByDayDF.groupBy($"order_channel_id").agg(count("express_number").as("express_number")).cache()

  //各渠道最大快递单数
  val maxChannelExpressBillTotalCount: Row = channelExpressBillTotalCountDF.agg(max("express_number")).first()

  //各渠道最小快递单数
  val minChannelExpressBillTotalCount: Row = channelExpressBillTotalCountDF.agg(min("express_number")).first()

  //各渠道平均快递单数
  val avgChannelExpressBillTotalCount: Row = channelExpressBillTotalCountDF.agg(avg("express_number")).first()

  val terminalExpressBillTotalCountDF: DataFrame = expressBillDetailByDayDF.groupBy($"order_terminal_type").agg(count("express_number").as("express_number")).cache()
  println(terminalExpressBillTotalCountDF)

  //各终端最大快递单数
  val maxTerminalExpressBillTotalCount: Row = terminalExpressBillTotalCountDF.agg(max("express_number")).first()

  //各终端最小快递单数
  val minTerminalExpressBillTotalCount: Row = terminalExpressBillTotalCountDF.agg(min("express_number")).first()

  //各终端平均快递单数
  val avgTerminalExpressBillTotalCount: Row = terminalExpressBillTotalCountDF.agg(avg("express_number")).first()
  println(avgTerminalExpressBillTotalCount)

  //将每条记录写入到Row对象中
  val rowInfo = Row(
    day,
    totalExpressBillCount, //总快递单数
    if(maxTypeExpressBillTotalCount.isNullAt(0)) 0L else maxTypeExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(minTypeExpressBillTotalCount.isNullAt(0)) 0L else minTypeExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(avgTypeExpressBillTotalCount.isNullAt(0)) 0L else avgTypeExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxDotExpressBillTotalCount.isNullAt(0)) 0L else maxDotExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(minDotExpressBillTotalCount.isNullAt(0)) 0L else minDotExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(avgDotExpressBillTotalCount.isNullAt(0)) 0L else avgDotExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxChannelExpressBillTotalCount.isNullAt(0)) 0L else maxChannelExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(minChannelExpressBillTotalCount.isNullAt(0)) 0L else minChannelExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(avgChannelExpressBillTotalCount.isNullAt(0)) 0L else avgChannelExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(maxTerminalExpressBillTotalCount.isNullAt(0)) 0L else maxTerminalExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(minTerminalExpressBillTotalCount.isNullAt(0)) 0L else minTerminalExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
    if(avgTerminalExpressBillTotalCount.isNullAt(0)) 0L else avgTerminalExpressBillTotalCount.get(0).asInstanceOf[Number].longValue()
  )

  println(rowInfo)

  //将计算好的结果数据写入到结果对象中
  rows.append(rowInfo)

  //释放资源
  expressBillDetailByDayDF.unpersist()
  customerTypeExpressBillTotalCountDF.unpersist()
  dotExpressBillTotalCountDF.unpersist()
  channelExpressBillTotalCountDF.unpersist()
  terminalExpressBillTotalCountDF.unpersist()
})

​​​​​​​通过StructType构建指定Schema

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//创建DataFrame:schema+rdd(数据)
//定义指标结果的schema信息
val schema: StructType = StructType(Array(
  StructField("id", StringType, false, Metadata.empty),
  StructField("totalExpressBillCount", LongType, false, Metadata.empty),
  StructField("maxTypeExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("minTypeExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("avgTypeExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("maxDotExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("minDotExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("avgDotExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("maxChannelExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("minChannelExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("avgChannelExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("maxTerminalExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("minTerminalExpressBillTotalCount", LongType, false, Metadata.empty),
  StructField("avgTerminalExpressBillTotalCount", LongType, false, Metadata.empty)
))

 ​​​​​​​持久化指标数据到kudu表​​​​​​​

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//将数据转换成rdd对象
val data: RDD[Row] = sparkSession.sparkContext.makeRDD(rows)

//根据表结构和数据创建DataFrame对象
val quotaDF: DataFrame = sparkSession.createDataFrame(data, schema)

//TODO 5)将计算好的指标数据写入到kudu数据库中
//将dataframe数据写入到kudu数据库
save(quotaDF, OfflineTableDefine.expressBillSummary)

​​​​​​​删除缓存数据

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//TODO 6)删除缓存数据
expressBillDetailGroupByDayDF.unpersist()
expressBillDetailDF.unpersist()

​​​​​​​完整代码

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package cn.it.logistics.offline.dws

import cn.it.logistics.common.{Configuration, DateHelper, OfflineTableDefine, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, LongType, Metadata, StringType, StructField, StructType}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel

import scala.collection.mutable.ArrayBuffer

/**
 * 快递单主题指标开发
 */
object ExpressBillDWS extends OfflineApp{
  //定义应用程序的名称
  val appName = this.getClass.getSimpleName

  /**
   * 入口函数
   * @param args
   */
  def main(args: Array[String]): Unit = {
    /**
     * 实现步骤:
     * 1)创建SparkConf对象
     * 2)创建SparkSession对象
     * 3)读取快递明细宽表的数据
     * 4)对快递明细宽表的数据进行指标的计算
     * 5)将计算好的指标数据写入到kudu数据库中
     * 5.1:定义指标结果表的schema信息
     * 5.2:组织需要写入到kudu表的数据
     * 5.3:判断指标结果表是否存在,如果不存在则创建
     * 5.4:将数据写入到kudu表中
     * 6)删除缓存数据
     * 7)停止任务,退出sparksession
     */

    //TODO 1)创建SparkConf对象
    val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
      SparkUtils.sparkConf(appName)
    )

    //TODO 2)创建SparkSession对象
    val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
    sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)

    //处理数据
    execute(sparkSession)
  }

  /**
   * 数据处理
   *
   * @param sparkSession
   */
  override def execute(sparkSession: SparkSession): Unit = {
    //TODO 3)读取快递单明细宽表的数据
    val expressBillDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.expressBillDetail, Configuration.isFirstRunnable)

    //读取出来的明细宽表数据可能是增量的数据,也可能是全量的数据
    //全量的数据是包含多个日期的数据,增量数据是前一天的数据
    //需要计算的指标是以日为单位,每天的最大快递单数、最小快递单数、平均快递单数据
    //因此需要对读取出来的快递单明细宽表数据按照日为单位进行分组,然后统计每日的指标数据
    val expressBillDetailGroupByDayDF: DataFrame = expressBillDetailDF.select("day").groupBy("day").count().cache()

    //导入隐式转换
    import sparkSession.implicits._

    //定义计算好的指标结果集合对象
    val rows: ArrayBuffer[Row] = ArrayBuffer[Row]()

    //循环遍历所有日期的数据
    expressBillDetailGroupByDayDF.collect().foreach(row=>{
      //获取到需要处理的数据所在的日期
      val day: String = row.getAs[String](0)

      //根据日期查询该日期内的快递单明细数据,然后将查询到的结果进行指标计算(指定日期的指标)
      val expressBillDetailByDayDF: DataFrame = expressBillDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)

      //TODO 4)对快递明细宽表的数据进行指标的计算
      //总快递单数
      val totalExpressBillCount: Long = expressBillDetailByDayDF.agg(count("express_number")).first().getLong(0)
      //各类客户的快递单数
      val customerTypeExpressBillTotalCountDF: DataFrame = expressBillDetailByDayDF.groupBy($"type").agg(count("express_number").as("express_number")).cache()

      //各类客户最大快递单数
      val maxTypeExpressBillTotalCount: Row = customerTypeExpressBillTotalCountDF.agg(max("express_number")).first()

      //各类客户最小快递单数
      val minTypeExpressBillTotalCount: Row = customerTypeExpressBillTotalCountDF.agg(min("express_number")).first()

      //各类客户平均快递单数
      val avgTypeExpressBillTotalCount: Row = customerTypeExpressBillTotalCountDF.agg(avg("express_number")).first()

      //各网点的快递单数
      val dotExpressBillTotalCountDF: DataFrame = expressBillDetailByDayDF.groupBy($"dot_id").agg(count("express_number").as("express_number")).cache()

      //各网点最大快递单数
      val maxDotExpressBillTotalCount: Row = dotExpressBillTotalCountDF.agg(max("express_number")).first()

      //各网点最小快递单数
      val minDotExpressBillTotalCount: Row = dotExpressBillTotalCountDF.agg(min("express_number")).first()

      //各网点平均快递单数
      val avgDotExpressBillTotalCount: Row = dotExpressBillTotalCountDF.agg(avg("express_number")).first()

      //各渠道的快递单数
      val channelExpressBillTotalCountDF: DataFrame = expressBillDetailByDayDF.groupBy($"order_channel_id").agg(count("express_number").as("express_number")).cache()

      //各渠道最大快递单数
      val maxChannelExpressBillTotalCount: Row = channelExpressBillTotalCountDF.agg(max("express_number")).first()

      //各渠道最小快递单数
      val minChannelExpressBillTotalCount: Row = channelExpressBillTotalCountDF.agg(min("express_number")).first()

      //各渠道平均快递单数
      val avgChannelExpressBillTotalCount: Row = channelExpressBillTotalCountDF.agg(avg("express_number")).first()

      val terminalExpressBillTotalCountDF: DataFrame = expressBillDetailByDayDF.groupBy($"order_terminal_type").agg(count("express_number").as("express_number")).cache()
      println(terminalExpressBillTotalCountDF)

      //各终端最大快递单数
      val maxTerminalExpressBillTotalCount: Row = terminalExpressBillTotalCountDF.agg(max("express_number")).first()

      //各终端最小快递单数
      val minTerminalExpressBillTotalCount: Row = terminalExpressBillTotalCountDF.agg(min("express_number")).first()

      //各终端平均快递单数
      val avgTerminalExpressBillTotalCount: Row = terminalExpressBillTotalCountDF.agg(avg("express_number")).first()
      println(avgTerminalExpressBillTotalCount)

      //将每条记录写入到Row对象中
      val rowInfo = Row(
        day,
        totalExpressBillCount, //总快递单数
        if(maxTypeExpressBillTotalCount.isNullAt(0)) 0L else maxTypeExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(minTypeExpressBillTotalCount.isNullAt(0)) 0L else minTypeExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(avgTypeExpressBillTotalCount.isNullAt(0)) 0L else avgTypeExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(maxDotExpressBillTotalCount.isNullAt(0)) 0L else maxDotExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(minDotExpressBillTotalCount.isNullAt(0)) 0L else minDotExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(avgDotExpressBillTotalCount.isNullAt(0)) 0L else avgDotExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(maxChannelExpressBillTotalCount.isNullAt(0)) 0L else maxChannelExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(minChannelExpressBillTotalCount.isNullAt(0)) 0L else minChannelExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(avgChannelExpressBillTotalCount.isNullAt(0)) 0L else avgChannelExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(maxTerminalExpressBillTotalCount.isNullAt(0)) 0L else maxTerminalExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(minTerminalExpressBillTotalCount.isNullAt(0)) 0L else minTerminalExpressBillTotalCount.get(0).asInstanceOf[Number].longValue(),
        if(avgTerminalExpressBillTotalCount.isNullAt(0)) 0L else avgTerminalExpressBillTotalCount.get(0).asInstanceOf[Number].longValue()
      )

      println(rowInfo)

      //将计算好的结果数据写入到结果对象中
      rows.append(rowInfo)

      //释放资源
      expressBillDetailByDayDF.unpersist()
      customerTypeExpressBillTotalCountDF.unpersist()
      dotExpressBillTotalCountDF.unpersist()
      channelExpressBillTotalCountDF.unpersist()
      terminalExpressBillTotalCountDF.unpersist()
    })

    //创建DataFrame:schema+rdd(数据)
    //定义指标结果的schema信息
    val schema: StructType = StructType(Array(
      StructField("id", StringType, false, Metadata.empty),
      StructField("totalExpressBillCount", LongType, false, Metadata.empty),
      StructField("maxTypeExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("minTypeExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("avgTypeExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("maxDotExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("minDotExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("avgDotExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("maxChannelExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("minChannelExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("avgChannelExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("maxTerminalExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("minTerminalExpressBillTotalCount", LongType, false, Metadata.empty),
      StructField("avgTerminalExpressBillTotalCount", LongType, false, Metadata.empty)
    ))

    //将数据转换成rdd对象
    val data: RDD[Row] = sparkSession.sparkContext.makeRDD(rows)

    //根据表结构和数据创建DataFrame对象
    val quotaDF: DataFrame = sparkSession.createDataFrame(data, schema)

    //TODO 5)将计算好的指标数据写入到kudu数据库中
    //将dataframe数据写入到kudu数据库
    save(quotaDF, OfflineTableDefine.expressBillSummary)

    //TODO 6)删除缓存数据
    expressBillDetailGroupByDayDF.unpersist()
    expressBillDetailDF.unpersist()

    //TODO 7)停止任务,退出sparksession
    sparkSession.stop()
  }
}

​​​​​​​3、测试验证

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022/03/13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
什么是公链?公链开发优缺点分析
公链通常被认为是“完全去中心化”的,因为没有任何个人或者机构可以控制或篡改其中数据的读写。其无需注册、授权便可匿名访问网络,且具有去中心化、中立、开放、不可篡改等特点,适用于虚拟货币、互联网金融等领域。
用户I34I63353I9
2022/09/02
1.1K0
公链开发:全球十大公链
1、BTC 比特币市值1.02万亿美元,发行量1868.8万,24小时成交额311.6亿美元。 与大多数钱银不同,比特币不是由特定的钱银组织发行的。它是依据特定的算法经过很多的计算发生的。比特币经济运用整个p2p网络中很多节点组成的分布式数据库来承认和记录一切买卖,并运用密码学的规划来确保钱银流通各个环节的安全。 全球十大公链有哪些? 2、ETH 以太坊市值2970.91亿美元,总发行量1.15亿美元,24小时成交额203.34亿美元。 以太坊(Ethereum)是一个面向分布式运用的全球开源渠道。这是一个区块链体系,它的呈现是为了解决比特币网络中存在的问题。它为开发人员在区块链上构建和发布运用程序供给了一个渠道。以太坊能够用来编程、分发、担保和买卖任何东西,包括投票、域名、金融买卖所、众筹、公司管理、合同和大多数协议、知识产权、智能财物等。2014年7月24日上映。众筹时的初始发行总额约为7200万ETH。、
vx-longbatuiguang1
2022/06/24
3.8K0
公链开发:全球十大公链
区块链分为公有链、私有链和联盟链 它们的区别?
简单来说,区块链的本质是一个开源的分布式账本。是比特币等虚拟钱银的核心技术。它能够高效地记载买卖双方的买卖,并保证这些记载是可验证的和永久保存的。一起,区块链本身具有去中心化、非中介化、信息透明、不可篡改和安全的特点。区块链分为公链、私链和联盟链。那么,它们有什么区别呢? 咱们了解的比特币,便是公链。它指的是区块链一致,即世界上任何人都能够阅读、发送买卖并取得有效确认。公链的任何节点对任何人都是敞开的,每个人都能够在这个区块链中参加核算。任何人都能够下载并取得完好的区块链数据,也便是一切的书籍。
v-longbatuiguang2
2022/05/07
3.2K0
区块链分为公有链、私有链和联盟链 它们的区别?
DApp系统操作原理
DAPP的中心要素 您能够创建一个完好的区块链体系,并从头开始开发自己的运用程序。这就相当于开发一个新的区块链项目,既费力又对个人能力要求高。如果你只是一个普通的开发者,想根据区块链开发自己的运用,挑选一个老练的区块链运用开发渠道就能够了。这些渠道一般都供给完善的接口和开发工具,所以你只需求忧虑事务逻辑及其完成。关于区块链的底部,这些渠道将帮助你处理好。
v-longbatuiguang2
2022/06/24
5930
DApp系统操作原理
构建跨公链平台解决DApp开发问题
春节后的那段时刻,被“3点不眠区块链团”刷屏。区块链的评论风行全国,大佬们团体跑进商场。许多人说,区块链的春天来了,2018年将是区块链迸发的一年。 前两天,我和一个在区块链从事技能开发的朋友聊天。他说,大公司进入商场,让区块链的泡沫变得更大。现在真实能落地的使用项目很少。现在的区块链是烟花,不是桃花,所以很难开花结果。即使是桃花也不能结果,由于没有好的土壤。 作为一个技能小白,我问:为什么真实落地的项目这么少?为什么结不出果实?
vx-longbatuiguang1
2022/06/24
4490
构建跨公链平台解决DApp开发问题
区块链应用(Dapp)是什么?
自2009年中本聪创造比特币以来,区块链的技能进入了群众的视野。开展至今已有10年。区块链也从1.0可编程钱银升级到3.0可编程社会。金融、医疗、物流等社会职业纷纷引入区块链技能。在区块链蓬勃开展的一起,一个小分支也在成长。这便是区块链使用,也便是咱们常说的Dapp。 今日咱们就好好聊聊Dapp是什么,它的特色,它的开展前景。
vx-longbatuiguang1
2022/06/14
2.6K0
区块链应用(Dapp)是什么?
区块链及其对世界的影响
区块链是整个加密货币生态圈的底层技术和最根本的价值主张。它是比特币的安全保障,也是以太坊智能合约的价值来源。本文旨在深入探讨区块链的概念、价值及其如何重塑现代社会的信任机制。
用户7358413
2021/10/22
1K0
深度盘点 NFT 基础设施:公链与侧链
从比特币到新一代区块链,近年来,这一领域的技术不断发展,承载应用的基础设施也在不断更新迭代。我们先一起快速回顾一下技术发展的“弧度”是怎样的。
区块链大本营
2021/06/16
1.8K0
深度盘点 NFT 基础设施:公链与侧链
区块链DAPP系统设计分析
一、什么是DAPP(分布式运用)? DAPP是DecentralizedApplication的缩写,中文叫分布式运用/去中心化运用 DAPP将选用不相同的根底区块链开展渠道和一致机制,或从DAPP不同的底层区块链开发渠道就像手机的IOS体系和Android体系相同,是每个DAPP的底层。 DAPP是从底层区块链渠道生态衍生出来的各种分布式运用,也是区块链国际的根底。 DAPP在区块链,就像APP在IOS和Android相同。 依据DavidJohnston对DAPP的界说,一个真实的DAPP运用需求一起满意几个条件:
vx-longbatuiguang1
2022/05/28
1.4K0
区块链DAPP系统设计分析
区块链钱包解析与介绍
区块链的数据存储被称为“总账”(general ledger),这是一个符合其本质的姓名。区块链分类账的逻辑与传统分类账相似。比如,我或许错转了一笔钱给你。这项生意被区块链分类账承受并记载在其间。更正错漏的办法不是直接批改账簿,恢复到差错生意前的状态;这是一个新的更正生意,你把钱转回给我。当区块链分类账承受一笔新的生意时,差错和遗失就会得到纠正,一切的纠正进程都会记载在分类账中,有迹可循。欢迎探讨交流
v-longbatuiguang2
2022/06/24
7200
区块链钱包解析与介绍
区块链技术的安全价值与局限性解析
我们正站在新变革来临的边缘,互联网正在经历去中心化的阶段。经过了20年的科学研究,在密码学领域和去中心化计算网络上都产生了新的进展,带来诸如区块链技术(blockchain)之类的前沿技术,而这些技术可能潜含着从底层的改变社会运转方式的力量。 5月26日,美国国防部宣布与加密通讯开发商ITAMCO签署合同,共同开发用在美国军方的基于区块链的创新应用——一款“安全,不可侵入的消息传递、交易平台”。据称,这项合作计划能够为美国军方更安全地在总部和地面部队之间的通讯、专员及五角大楼之间的情报传递提供安全可靠的
FB客服
2018/02/26
1.7K0
区块链技术的安全价值与局限性解析
跨链技术与通证经济
这是我在小雨智媒上的一次关于跨链技术、通证经济以及我们的PalletOne的分享,当时是语音和图片的形式,被官方整理成了文字,我也就再转过来,希望能够为更多的人有所帮助。
深蓝studyzy
2022/06/16
3630
跨链技术与通证经济
蒋涛:DCO才是区块链的杀手级应用!
区块链并非要完全去中心化,以太坊DAO也并非完美 CSDN 董事长:蒋涛 整理 | 鸽子 3月15日,由亚太区块链协会、TOKENSKY组委会及CSDN等联合主办的“2018 TOKENSKY区块链大
区块链大本营
2018/05/10
9510
区块链分享
区块链为什么会有如此魅力,让全社会都在关注它?区块链技术是本质是不可篡改的、去中心化的公开账本,是记账方式的进步。记账是社会生产生活的基石,记账方式的进步,能推动社会进步。
一个淡定的打工菜鸟
2018/12/27
1.6K6
一文看懂区块链的3种类型:公有链、私有链和联盟链
在具体介绍公有链、私有链和联盟链之前,我们先从最简单的字面意思上,对这几个概念有个大致了解:
互链脉搏
2018/05/25
4.7K0
一文看懂区块链的3种类型:公有链、私有链和联盟链
区块链技术原理
随着互联网技术的不断发展和应用,数据的数量和价值也在不断增长,然而如何保护数据的安全和隐私,如何有效地管理和交换数据成为了一个日益重要的问题,区块链技术作为一种新兴的分布式数据库技术具有去中心化、安全和透明等特点,在数据管理和交易方面具有广泛的应用前景。本文将从区块链技术的基本概念、工作原理等方面进行介绍和概述,通过对区块链技术的深入了解,读者可以掌握其基本原理和应用方法为未来的区块链技术应用和创新提供帮助和指导
Al1ex
2023/08/10
6550
区块链技术原理
区块链中公有链,私有链和联盟链之间有何区别
公有链上的各个节点可以自由加入和退出网络,并参加链上数据的读写,读写时以扁平的拓扑结构互联互通,网络中不存在任何中心化的服务端节点。像大家所熟悉的比特币和以太坊,都是一种公有链。公有链的好处是没有限制,你可以自由参加。
用户2357564
2018/07/18
1.8K0
区块链中公有链,私有链和联盟链之间有何区别
让价值流动——区块链跨链技术
区块链共识机制形成了价值,价值互联网里价值流动的该如何实现? 跨链技术的产生 加密数字货币的区块链公网野蛮生长的同时,处于交易性能、容量规模、隐私保护、合规监管的考虑,联盟链和私链技术被商业机构特别是金融机构广泛采用。联盟链和私链的方式从一定程度违背了区块链去中心价值和信任体系,也让区块链里面的数字资产不能再不同的区块链间直接转移,主动或被动地导致了价值孤岛,联盟链和私链的局限性令各种连接不同区块链的跨链技术开始应运而生。 目前区块链应用越来越丰富,上面的资产越来越多,如何把每种数字代币比喻成一个价值孤岛,
rectinajh
2018/05/17
1.4K0
【易错概念】区块链的侧链/跨链,硬分叉链/软分叉链,主链/子链
侧链,是对于某个主链的一个相对概念。英文为sidechains。侧链协议是一种实现双向锚定(Two-way Peg)的协议,通过侧链协议实现资产在主链 和其它链之间互相转换,或是以独立的、隔离系统的形式,降低核心区块链上发生交易 的次数。侧链是以融合的方式实现加密货币金融生态的目标,而不是像其它加密货币一样排斥现有的系统。利用侧链,我们可以轻松的建立各种智能化的金融合约,股票、期货、衍生品等等。
辉哥
2018/09/26
2.6K0
区块链入门总结区块链
新交易创建 -> 交易广播网络 -> 交易验证 -> 验证结果通过网络广播 -> 交易写账本
若与
2018/09/29
53.3K1
区块链入门总结区块链
推荐阅读
相关推荐
什么是公链?公链开发优缺点分析
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验