前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于大数据技术的开源在线教育项目 二

基于大数据技术的开源在线教育项目 二

作者头像
soundhearer
发布2020-12-18 14:34:03
1.3K0
发布2020-12-18 14:34:03
举报
文章被收录于专栏:数据湖数据湖

上篇文章我们介绍了离线数仓的用户注册模块,本文我们来介绍做题模块

模拟日志的数据格式如下,详细请参见我的开源项目 https://github.com/SoundHearer/kuaiban

代码语言:javascript
复制
1.QzWebsite.log 做题网站日志数据
{
    "createtime": "2019-07-22 11:47:18",  //创建时间
    "creator": "admin",   //创建者
    "dn": "webA",   //网站分区
    "domain": "-",
    "dt": "20190722",  //日期分区
    "multicastgateway": "-",
    "multicastport": "-",
    "multicastserver": "-",
    "sequence": "-",
    "siteid": 0,   //网站id
    "sitename": "sitename0",  //网站名称
    "status": "-",   
    "templateserver": "-"
}

2.QzSiteCourse.log  网站课程日志数据
{
    "boardid": 64,  //课程模板id
    "coursechapter": "-",  
    "courseid": 66,  //课程id
    "createtime": "2019-07-22 11:43:32",  //创建时间
    "creator": "admin",   //创建者
    "dn": "webA",   //网站分区
    "dt": "20190722",  //日期分区
    "helpparperstatus": "-",
    "sequence": "-",
    "servertype": "-",
    "showstatus": "-",
    "sitecourseid": 2,  //网站课程id
    "sitecoursename": "sitecoursename2",  //网站课程名称
    "siteid": 77,  //网站id
    "status": "-"
}

3.QzQuestionType.log 题目类型数据
{
    "createtime": "2019-07-22 10:42:47",   //创建时间
    "creator": "admin",    //创建者
    "description": "-",
    "dn": "webA",   //网站分区
    "dt": "20190722",  //日期分区
    "papertypename": "-",
    "questypeid": 0,  //做题类型id
    "quesviewtype": 0,
    "remark": "-",
    "sequence": "-",
    "splitscoretype": "-",
    "status": "-",
    "viewtypename": "viewtypename0"
}

4.QzQuestion.log 做题日志数据
{
    "analysis": "-",
    "answer": "-",
    "attanswer": "-",
    "content": "-",
    "createtime": "2019-07-22 11:33:46",  //创建时间
    "creator": "admin",  //创建者
    "difficulty": "-",
    "dn": "webA",   //网站分区
    "dt": "20190722",  //日期分区
    "lecture": "-",
    "limitminute": "-",
    "modifystatus": "-",
    "optnum": 8,
    "parentid": 57,
    "quesskill": "-",
    "questag": "-",
    "questionid": 0,  //题id
    "questypeid": 57, //题目类型id
    "quesviewtype": 44,  
    "score": 24.124501582742543, //题的分数
    "splitscore": 0.0,
    "status": "-",
    "vanalysisaddr": "-",
    "vdeoaddr": "-"
}

5.QzPointQuestion.log 做题知识点关联数据
{
    "createtime": "2019-07-22 09:16:46",   //创建时间
    "creator": "admin",  //创建者
    "dn": "webA",  //网站分区
    "dt": "20190722", //日期分区
    "pointid": 0,  //知识点id
    "questionid": 0, //题id
    "questype": 0  
}

模拟数据采集上传数据

建表

篇幅较大,详见开源项目

代码语言:javascript
复制
create external  table `dwd`.`dwd_qz_chapter`(
chapterid int ,
chapterlistid int ,
chaptername string ,
sequence string ,
showstatus string  ,
creator string  ,
createtime timestamp,
courseid int  ,
chapternum int,
outchapterid int)
partitioned by(
dt string,
dn string)
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY '\t'
 STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');

create external table `dwd`.`dwd_qz_chapter_list`(
chapterlistid int ,
chapterlistname string ,
courseid int ,
chapterallnum int ,
sequence string,
status string,
creator string ,
createtime timestamp 
)
partitioned by(
dt string,
dn string)
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY '\t'
 STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');

解析数据

需求1:使用spark解析ods层数据,将数据存入到对应的hive表中,要求对所有score 分数字段进行保留两位1位小数并且四舍五入。

代码语言:javascript
复制
import com.alibaba.fastjson.JSONObject
import com.catelf.qz.bean.{DwdQzPaperView, DwdQzPoint, DwdQzQuestion}
import com.catelf.util.ParseJsonData
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SaveMode, SparkSession}

/**
 * etl用户做题信息
 */
object EtlDataService {

  /**
   * 解析章节数据
   *
   * @param ssc
   * @param sparkSession
   * @return
   */
  def etlQzChapter(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._ //隐式转换
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzChapter.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val chapterid = jsonObject.getIntValue("chapterid")
        val chapterlistid = jsonObject.getIntValue("chapterlistid")
        val chaptername = jsonObject.getString("chaptername")
        val sequence = jsonObject.getString("sequence")
        val showstatus = jsonObject.getString("showstatus")
//        val status = jsonObject.getString("status")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val courseid = jsonObject.getIntValue("courseid")
        val chapternum = jsonObject.getIntValue("chapternum")
        val outchapterid = jsonObject.getIntValue("outchapterid")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (chapterid, chapterlistid, chaptername, sequence, showstatus, creator, createtime,
          courseid, chapternum, outchapterid, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_chapter")
  }

  /**
   * 解析章节列表数据
   *
   * @param ssc
   * @param sparkSession
   */
  def etlQzChapterList(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzChapterList.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val chapterlistid = jsonObject.getIntValue("chapterlistid")
        val chapterlistname = jsonObject.getString("chapterlistname")
        val courseid = jsonObject.getIntValue("courseid")
        val chapterallnum = jsonObject.getIntValue("chapterallnum")
        val sequence = jsonObject.getString("sequence")
        val status = jsonObject.getString("status")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (chapterlistid, chapterlistname, courseid, chapterallnum, sequence, status, creator, createtime, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_chapter_list")
  }

  /**
   * 解析做题数据
   *
   * @param ssc
   * @param sparkSession
   */
  def etlQzPoint(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzPoint.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val pointid = jsonObject.getIntValue("pointid")
        val courseid = jsonObject.getIntValue("courseid")
        val pointname = jsonObject.getString("pointname")
        val pointyear = jsonObject.getString("pointyear")
        val chapter = jsonObject.getString("chapter")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val status = jsonObject.getString("status")
        val modifystatus = jsonObject.getString("modifystatus")
        val excisenum = jsonObject.getIntValue("excisenum")
        val pointlistid = jsonObject.getIntValue("pointlistid")
        val chapterid = jsonObject.getIntValue("chapterid")
        val sequence = jsonObject.getString("sequence")
        val pointdescribe = jsonObject.getString("pointdescribe")
        val pointlevel = jsonObject.getString("pointlevel")
        val typeslist = jsonObject.getString("typelist")
        val score = BigDecimal(jsonObject.getDouble("score")).setScale(1, BigDecimal.RoundingMode.HALF_UP) //保留1位小数 并四舍五入
        val thought = jsonObject.getString("thought")
        val remid = jsonObject.getString("remid")
        val pointnamelist = jsonObject.getString("pointnamelist")
        val typelistids = jsonObject.getString("typelistids")
        val pointlist = jsonObject.getString("pointlist")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        DwdQzPoint(pointid, courseid, pointname, pointyear, chapter, creator, createtime, status, modifystatus, excisenum, pointlistid,
          chapterid, sequence, pointdescribe, pointlevel, typeslist, score, thought, remid, pointnamelist, typelistids,
          pointlist, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_point")
  }

  /**
   * 解析知识点下的题数据
   *
   * @param ssc
   * @param sparkSession
   * @return
   */
  def etlQzPointQuestion(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzPointQuestion.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val pointid = jsonObject.getIntValue("pointid")
        val questionid = jsonObject.getIntValue("questionid")
        val questtype = jsonObject.getIntValue("questtype")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (pointid, questionid, questtype, creator, createtime, dt, dn)
      })
    }).toDF().write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_point_question")
  }

  /**
   * 解析网站课程
   *
   * @param ssc
   * @param sparkSession
   */
  def etlQzSiteCourse(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzSiteCourse.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val sitecourseid = jsonObject.getIntValue("sitecourseid")
        val siteid = jsonObject.getIntValue("siteid")
        val courseid = jsonObject.getIntValue("courseid")
        val sitecoursename = jsonObject.getString("sitecoursename")
        val coursechapter = jsonObject.getString("coursechapter")
        val sequence = jsonObject.getString("sequence")
        val status = jsonObject.getString("status")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val helppaperstatus = jsonObject.getString("helppaperstatus")
        val servertype = jsonObject.getString("servertype")
        val boardid = jsonObject.getIntValue("boardid")
        val showstatus = jsonObject.getString("showstatus")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (sitecourseid, siteid, courseid, sitecoursename, coursechapter, sequence, status, creator
          , createtime, helppaperstatus, servertype, boardid, showstatus, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_site_course")
  }

  /**
   * 解析课程数据
   *
   * @param ssc
   * @param sparkSession
   */
  def etlQzCourse(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzCourse.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val courseid = jsonObject.getIntValue("courseid")
        val majorid = jsonObject.getIntValue("majorid")
        val coursename = jsonObject.getString("coursename")
        val coursechapter = jsonObject.getString("coursechapter")
        val sequence = jsonObject.getString("sequnece")
        val isadvc = jsonObject.getString("isadvc")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val status = jsonObject.getString("status")
        val chapterlistid = jsonObject.getIntValue("chapterlistid")
        val pointlistid = jsonObject.getIntValue("pointlistid")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (courseid, majorid, coursename, coursechapter, sequence, isadvc, creator, createtime, status
          , chapterlistid, pointlistid, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_course")
  }

  /**
   * 解析课程辅导数据
   *
   * @param ssc
   * @param sparkSession
   */
  def etlQzCourseEdusubject(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzCourseEduSubject.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val courseeduid = jsonObject.getIntValue("courseeduid")
        val edusubjectid = jsonObject.getIntValue("edusubjectid")
        val courseid = jsonObject.getIntValue("courseid")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val majorid = jsonObject.getIntValue("majorid")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (courseeduid, edusubjectid, courseid, creator, createtime, majorid, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_course_edusubject")
  }

  /**
   * 解析课程网站
   *
   * @param ssc
   * @param sparkSession
   */
  def etlQzWebsite(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzWebsite.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val siteid = jsonObject.getIntValue("siteid")
        val sitename = jsonObject.getString("sitename")
        val domain = jsonObject.getString("domain")
        val sequence = jsonObject.getString("sequence")
        val multicastserver = jsonObject.getString("multicastserver")
        val templateserver = jsonObject.getString("templateserver")
        val status = jsonObject.getString("status")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val multicastgateway = jsonObject.getString("multicastgateway")
        val multicastport = jsonObject.getString("multicastport")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (siteid, sitename, domain, sequence, multicastserver, templateserver, status, creator, createtime,
          multicastgateway, multicastport, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_website")
  }

  /**
   * 解析主修数据
   *
   * @param ssc
   * @param sparkSession
   */
  def etlQzMajor(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzMajor.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val majorid = jsonObject.getIntValue("majorid")
        val businessid = jsonObject.getIntValue("businessid")
        val siteid = jsonObject.getIntValue("siteid")
        val majorname = jsonObject.getString("majorname")
        val shortname = jsonObject.getString("shortname")
        val status = jsonObject.getString("status")
        val sequence = jsonObject.getString("sequence")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val columm_sitetype = jsonObject.getString("columm_sitetype")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (majorid, businessid, siteid, majorname, shortname, status, sequence, creator, createtime, columm_sitetype, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_major")
  }

  /**
   * 解析做题业务
   *
   * @param ssc
   * @param sparkSession
   */
  def etlQzBusiness(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzBusiness.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item);
        val businessid = jsonObject.getIntValue("businessid")
        val businessname = jsonObject.getString("businessname")
        val sequence = jsonObject.getString("sequence")
        val status = jsonObject.getString("status")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val siteid = jsonObject.getIntValue("siteid")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (businessid, businessname, sequence, status, creator, createtime, siteid, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_business")
  }

  def etlQzPaperView(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzPaperView.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val paperviewid = jsonObject.getIntValue("paperviewid")
        val paperid = jsonObject.getIntValue("paperid")
        val paperviewname = jsonObject.getString("paperviewname")
        val paperparam = jsonObject.getString("paperparam")
        val openstatus = jsonObject.getString("openstatus")
        val explainurl = jsonObject.getString("explainurl")
        val iscontest = jsonObject.getString("iscontest")
        val contesttime = jsonObject.getString("contesttime")
        val conteststarttime = jsonObject.getString("conteststarttime")
        val contestendtime = jsonObject.getString("contestendtime")
        val contesttimelimit = jsonObject.getString("contesttimelimit")
        val dayiid = jsonObject.getIntValue("dayiid")
        val status = jsonObject.getString("status")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val paperviewcatid = jsonObject.getIntValue("paperviewcatid")
        val modifystatus = jsonObject.getString("modifystatus")
        val description = jsonObject.getString("description")
        val papertype = jsonObject.getString("papertype")
        val downurl = jsonObject.getString("downurl")
        val paperuse = jsonObject.getString("paperuse")
        val paperdifficult = jsonObject.getString("paperdifficult")
        val testreport = jsonObject.getString("testreport")
        val paperuseshow = jsonObject.getString("paperuseshow")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        DwdQzPaperView(paperviewid, paperid, paperviewname, paperparam, openstatus, explainurl, iscontest, contesttime,
          conteststarttime, contestendtime, contesttimelimit, dayiid, status, creator, createtime, paperviewcatid, modifystatus,
          description, papertype, downurl, paperuse, paperdifficult, testreport, paperuseshow, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_paper_view")
  }

  def etlQzCenterPaper(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzCenterPaper.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val paperviewid = jsonObject.getIntValue("paperviewid")
        val centerid = jsonObject.getIntValue("centerid")
        val openstatus = jsonObject.getString("openstatus")
        val sequence = jsonObject.getString("sequence")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (paperviewid, centerid, openstatus, sequence, creator, createtime, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_center_paper")
  }

  def etlQzPaper(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzPaper.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val paperid = jsonObject.getIntValue("paperid")
        val papercatid = jsonObject.getIntValue("papercatid")
        val courseid = jsonObject.getIntValue("courseid")
        val paperyear = jsonObject.getString("paperyear")
        val chapter = jsonObject.getString("chapter")
        val suitnum = jsonObject.getString("suitnum")
        val papername = jsonObject.getString("papername")
        val status = jsonObject.getString("status")
        val creator = jsonObject.getString("creator")
        val craetetime = jsonObject.getString("createtime")
        val totalscore = BigDecimal.apply(jsonObject.getString("totalscore")).setScale(1, BigDecimal.RoundingMode.HALF_UP)
        val chapterid = jsonObject.getIntValue("chapterid")
        val chapterlistid = jsonObject.getIntValue("chapterlistid")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (paperid, papercatid, courseid, paperyear, chapter, suitnum, papername, status, creator, craetetime, totalscore, chapterid,
          chapterlistid, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_paper")
  }

  def etlQzCenter(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzCenter.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(parititons => {
      parititons.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val centerid = jsonObject.getIntValue("centerid")
        val centername = jsonObject.getString("centername")
        val centeryear = jsonObject.getString("centeryear")
        val centertype = jsonObject.getString("centertype")
        val openstatus = jsonObject.getString("openstatus")
        val centerparam = jsonObject.getString("centerparam")
        val description = jsonObject.getString("description")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val sequence = jsonObject.getString("sequence")
        val provideuser = jsonObject.getString("provideuser")
        val centerviewtype = jsonObject.getString("centerviewtype")
        val stage = jsonObject.getString("stage")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (centerid, centername, centeryear, centertype, openstatus, centerparam, description, creator, createtime,
          sequence, provideuser, centerviewtype, stage, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_center")
  }

  def etlQzQuestion(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzQuestion.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val questionid = jsonObject.getIntValue("questionid")
        val parentid = jsonObject.getIntValue("parentid")
        val questypeid = jsonObject.getIntValue("questypeid")
        val quesviewtype = jsonObject.getIntValue("quesviewtype")
        val content = jsonObject.getString("content")
        val answer = jsonObject.getString("answer")
        val analysis = jsonObject.getString("analysis")
        val limitminute = jsonObject.getString("limitminute")
        val score = BigDecimal.apply(jsonObject.getDoubleValue("score")).setScale(1, BigDecimal.RoundingMode.HALF_UP)
        val splitscore = BigDecimal.apply(jsonObject.getDoubleValue("splitscore")).setScale(1, BigDecimal.RoundingMode.HALF_UP)
        val status = jsonObject.getString("status")
        val optnum = jsonObject.getIntValue("optnum")
        val lecture = jsonObject.getString("lecture")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val modifystatus = jsonObject.getString("modifystatus")
        val attanswer = jsonObject.getString("attanswer")
        val questag = jsonObject.getString("questag")
        val vanalysisaddr = jsonObject.getString("vanalysisaddr")
        val difficulty = jsonObject.getString("difficulty")
        val quesskill = jsonObject.getString("quesskill")
        val vdeoaddr = jsonObject.getString("vdeoaddr")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        DwdQzQuestion(questionid, parentid, questypeid, quesviewtype, content, answer, analysis, limitminute, score, splitscore,
          status, optnum, lecture, creator, createtime, modifystatus, attanswer, questag, vanalysisaddr, difficulty, quesskill,
          vdeoaddr, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_question")
  }

  def etlQzQuestionType(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzQuestionType.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val quesviewtype = jsonObject.getIntValue("quesviewtype")
        val viewtypename = jsonObject.getString("viewtypename")
        val questiontypeid = jsonObject.getIntValue("questypeid")
        val description = jsonObject.getString("description")
        val status = jsonObject.getString("status")
        val creator = jsonObject.getString("creator")
        val createtime = jsonObject.getString("createtime")
        val papertypename = jsonObject.getString("papertypename")
        val sequence = jsonObject.getString("sequence")
        val remark = jsonObject.getString("remark")
        val splitscoretype = jsonObject.getString("splitscoretype")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (quesviewtype, viewtypename, questiontypeid, description, status, creator, createtime, papertypename, sequence,
          remark, splitscoretype, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_question_type")
  }


  /**
   * 解析用户做题情况数据
   *
   * @param ssc
   * @param sparkSession
   */
  def etlQzMemberPaperQuestion(ssc: SparkContext, sparkSession: SparkSession) = {
    import sparkSession.implicits._
    ssc.textFile("hdfs://cdh1.macro.com:8020/user/catelf/ods/QzMemberPaperQuestion.log").filter(item => {
      val obj = ParseJsonData.getJsonData(item)
      obj.isInstanceOf[JSONObject]
    }).mapPartitions(partitions => {
      partitions.map(item => {
        val jsonObject = ParseJsonData.getJsonData(item)
        val userid = jsonObject.getIntValue("userid")
        val paperviewid = jsonObject.getIntValue("paperviewid")
        val chapterid = jsonObject.getIntValue("chapterid")
        val sitecourseid = jsonObject.getIntValue("sitecourseid")
        val questionid = jsonObject.getIntValue("questionid")
        val majorid = jsonObject.getIntValue("majorid")
        val useranswer = jsonObject.getString("useranswer")
        val istrue = jsonObject.getString("istrue")
        val lasttime = jsonObject.getString("lasttime")
        val opertype = jsonObject.getString("opertype")
        val paperid = jsonObject.getIntValue("paperid")
        val spendtime = jsonObject.getIntValue("spendtime")
        v al score = BigDecimal.apply(jsonObject.getString("score")).setScale(1, BigDecimal.RoundingMode.HALF_UP)
        val question_answer = jsonObject.getIntValue("question_answer")
        val dt = jsonObject.getString("dt")
        val dn = jsonObject.getString("dn")
        (userid, paperviewid, chapterid, sitecourseid, questionid, majorid, useranswer, istrue, lasttime, opertype, paperid, spendtime, score,question_answer, dt, dn)
      })
    }).toDF().coalesce(1).write.mode(SaveMode.Append).insertInto("dwd.dwd_qz_member_paper_question")
  }
}

创建DwdController

代码语言:javascript
复制
import com.catelf.qz.service.EtlDataService
import com.catelf.util.HiveUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * 解析做题数据导入dwd层
 */
object DwdController {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    val sparkConf = new SparkConf().setAppName("dwd_qz_controller").setMaster("local[*]")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val ssc = sparkSession.sparkContext
    HiveUtil.openDynamicPartition(sparkSession) //开启动态分区
    HiveUtil.openCompression(sparkSession) //开启压缩
    HiveUtil.useSnappyCompression(sparkSession) //使用snappy压缩
    EtlDataService.etlQzChapter(ssc, sparkSession)
    EtlDataService.etlQzChapterList(ssc, sparkSession)
    EtlDataService.etlQzPoint(ssc, sparkSession)
    EtlDataService.etlQzPointQuestion(ssc, sparkSession)
    EtlDataService.etlQzSiteCourse(ssc, sparkSession)
    EtlDataService.etlQzCourse(ssc, sparkSession)
    EtlDataService.etlQzCourseEdusubject(ssc, sparkSession)
    EtlDataService.etlQzWebsite(ssc, sparkSession)
    EtlDataService.etlQzMajor(ssc, sparkSession)
    EtlDataService.etlQzBusiness(ssc, sparkSession)
    EtlDataService.etlQzPaperView(ssc, sparkSession)
    EtlDataService.etlQzCenterPaper(ssc, sparkSession)
    EtlDataService.etlQzPaper(ssc, sparkSession)
    EtlDataService.etlQzCenter(ssc, sparkSession)
    EtlDataService.etlQzQuestion(ssc, sparkSession)
    EtlDataService.etlQzQuestionType(ssc, sparkSession)
    EtlDataService.etlQzMemberPaperQuestion(ssc, sparkSession)
  }
}

运行该主类,可以在hive中得到解析后的dwd表

创建QzChapterDao 章节表dao类

代码语言:javascript
复制
import org.apache.spark.sql.SparkSession

object QzChapterDao {

  /**
   * 查询qz_chapter基础数据
   *
   * @param sparkSession
   * @return
   */
  def getDwdQzChapter(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select chapterid,chapterlistid,chaptername,sequence,showstatus,creator as " +
      "chapter_creator,createtime as chapter_createtime,courseid as chapter_courseid,chapternum,outchapterid,dt,dn from dwd.dwd_qz_chapter where " +
      s"dt='$dt'")

  }

  /**
   * 查询qz_chapter_list基础数据
   *
   * @param sparkSession
   * @param dt
   */
  def getDwdQzChapterList(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select chapterlistid,chapterlistname,chapterallnum,dn from dwd.dwd_qz_chapter_list " +
      s"where dt='$dt'")
  }

  /**
   * 查询qz_point基础数据
   *
   * @param sparkSession
   * @param dt
   */
  def getDwdQzPoint(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select pointid,pointname,pointyear,chapter,excisenum,pointlistid,chapterid," +
      "pointdescribe,pointlevel,typelist,score as point_score,thought,remid,pointnamelist,typelistids,pointlist,dn from " +
      s"dwd.dwd_qz_point where dt='$dt'")
  }

  /**
   * 查询qz_point_question基础数据
   *
   * @param sparkSession
   * @param dt
   */
  def getDwdQzPointQuestion(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql(s"select pointid,questionid,questype,dn from dwd.dwd_qz_point_question where dt='$dt'")
  }

创建QzCourseDao 课程表dao类

代码语言:javascript
复制
import org.apache.spark.sql.SparkSession

object QzCourseDao {

  def getDwdQzSiteCourse(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select sitecourseid,siteid,courseid,sitecoursename,coursechapter,sequence,status," +
      "creator as sitecourse_creator,createtime as sitecourse_createtime,helppaperstatus,servertype,boardid,showstatus,dt,dn " +
      s"from dwd.dwd_qz_site_course where dt='${dt}'")
  }

  def getDwdQzCourse(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select courseid,majorid,coursename,isadvc,chapterlistid,pointlistid,dn from " +
      s"dwd.dwd_qz_course where dt='${dt}'")
  }

  def getDwdQzCourseEduSubject(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select courseeduid,edusubjectid,courseid,dn from dwd.dwd_qz_course_edusubject " +
      s"where dt='${dt}'")
  }
}

创建QzMajorDao 主修表dao类

代码语言:javascript
复制
import org.apache.spark.sql.SparkSession

object QzMajorDao {
  def getQzMajor(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select majorid,businessid,siteid,majorname,shortname,status,sequence,creator as major_creator," +
      s"createtime as major_createtime,dt,dn from dwd.dwd_qz_major where dt='$dt'")
  }

  def getQzWebsite(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select siteid,sitename,domain,multicastserver,templateserver,creator," +
      s"createtime,multicastgateway,multicastport,dn from dwd.dwd_qz_website where dt='$dt'")
  }

  def getQzBusiness(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql(s"select businessid,businessname,dn from dwd.dwd_qz_business where dt='$dt'")
  }
}

创建QzPaperDao 试卷dao类

代码语言:javascript
复制
import org.apache.spark.sql.SparkSession

object QzPaperDao {

  def getDwdQzPaperView(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select paperviewid,paperid,paperviewname,paperparam,openstatus,explainurl,iscontest," +
      "contesttime,conteststarttime,contestendtime,contesttimelimit,dayiid,status,creator as paper_view_creator," +
      "createtime as paper_view_createtime,paperviewcatid,modifystatus,description,papertype,downurl,paperuse," +
      s"paperdifficult,testreport,paperuseshow,dt,dn from dwd.dwd_qz_paper_view where dt='$dt'")
  }

  def getDwdQzCenterPaper(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql(s"select paperviewid,sequence,centerid,dn from dwd.dwd_qz_center_paper where dt='$dt'")
  }

  def getDwdQzPaper(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select paperid,papercatid,courseid,paperyear,chapter,suitnum,papername,totalscore,chapterid," +
      s"chapterlistid,dn from dwd.dwd_qz_paper where dt='$dt'")
  }

  def getDwdQzCenter(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select centerid,centername,centeryear,centertype,centerparam,provideuser," +
      s"centerviewtype,stage,dn from dwd.dwd_qz_center where dt='$dt'")
  }
}

创建QzQuestionDao 做题dao类

代码语言:javascript
复制
import org.apache.spark.sql.SparkSession

object QzQuestionDao {

  def getQzQuestion(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select questionid,parentid,questypeid,quesviewtype,content,answer,analysis,limitminute," +
      "score,splitscore,status,optnum,lecture,creator,createtime,modifystatus,attanswer,questag,vanalysisaddr,difficulty," +
      s"quesskill,vdeoaddr,dt,dn from  dwd.dwd_qz_question where dt='$dt'")
  }

  def getQzQuestionType(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select questypeid,viewtypename,description,papertypename,remark,splitscoretype,dn from " +
      s"dwd.dwd_qz_question_type where dt='$dt'")
  }
}

创建 UserPaperDetailDao 宽表dao类

代码语言:javascript
复制
import org.apache.spark.sql.SparkSession

object UserPaperDetailDao {

  def getDwdQzMemberPaperQuestion(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select userid,paperviewid,chapterid,sitecourseid,questionid,majorid,useranswer,istrue,lasttime,opertype," +
      s"paperid,spendtime,score,question_answer,dt,dn from dwd.dwd_qz_member_paper_question where dt='$dt'")
  }

  def getDwsQzChapter(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select chapterid,chapterlistid,chaptername,sequence as chapter_sequence,status as chapter_status," +
      "chapter_courseid,chapternum,chapterallnum,outchapterid,chapterlistname,pointid,questype,pointname,pointyear" +
      ",chapter,excisenum,pointlistid,pointdescribe,pointlevel,typelist,point_score,thought,remid,pointnamelist," +
      s"typelistids,pointlist,dn from dws.dws_qz_chapter where dt='$dt'")
  }

  def getDwsQzCourse(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select sitecourseid,siteid,courseid,sitecoursename,coursechapter,sequence as course_sequence," +
      "status as course_status,sitecourse_creator,sitecourse_createtime,helppaperstatus,servertype,boardid,showstatus,majorid," +
      s"coursename,isadvc,chapterlistid,pointlistid,courseeduid,edusubjectid,dn from dws.dws_qz_course where dt='$dt'")
  }

  def getDwsQzMajor(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select majorid,businessid,majorname,shortname,status as major_status,sequence  as major_sequence," +
      "major_creator,major_createtime,businessname,sitename,domain,multicastserver,templateserver,multicastgateway,multicastport," +
      s"dn from dws.dws_qz_major where dt=$dt")
  }

  def getDwsQzPaper(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select paperviewid,paperid,paperviewname,paperparam,openstatus,explainurl,iscontest,contesttime," +
      "conteststarttime,contestendtime,contesttimelimit,dayiid,status as paper_status,paper_view_creator,paper_view_createtime," +
      "paperviewcatid,modifystatus,description,paperuse,testreport,centerid,sequence as paper_sequence,centername,centeryear," +
      "centertype,provideuser,centerviewtype,stage as paper_stage,papercatid,courseid,paperyear,suitnum,papername,totalscore,dn" +
      s" from dws.dws_qz_paper where dt=$dt")
  }

  def getDwsQzQuestion(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select questionid,parentid as question_parentid,questypeid,quesviewtype,content as question_content," +
      "answer as question_answer,analysis as question_analysis,limitminute as question_limitminute,score as question_score," +
      "splitscore,lecture,creator as question_creator,createtime as question_createtime,modifystatus as question_modifystatus," +
      "attanswer as question_attanswer,questag as question_questag,vanalysisaddr as question_vanalysisaddr,difficulty as question_difficulty," +
      "quesskill,vdeoaddr,description as question_description,splitscoretype as question_splitscoretype,dn " +
      s" from dws.dws_qz_question where dt=$dt")
  }

}

维度退化、合成宽表 业务类

需求2:基于dwd层基础表数据,需要对表进行维度退化进行表聚合,聚合成dws.dws_qz_chapter(章节维度表),dws.dws_qz_course(课程维度表),dws.dws_qz_major(主修维度表),dws.dws_qz_paper(试卷维度表),dws.dws_qz_question(题目维度表),使用spark sql和dataframe api操作

dws.dws_qz_chapte : 4张表join dwd.dwd_qz_chapter inner join dwd.qz_chapter_list join条件:chapterlistid和dn ,inner join dwd.dwd_qz_point join条件:chapterid和dn, inner join dwd.dwd_qz_point_question join条件:pointid和dn

dws.dws_qz_course:3张表join dwd.dwd_qz_site_course inner join dwd.qz_course join条件:courseid和dn , inner join dwd.qz_course_edusubject join条件:courseid和dn

dws.dws_qz_major:3张表join dwd.dwd_qz_major inner join dwd.dwd_qz_website join条件:siteid和dn , inner join dwd.dwd_qz_business join条件:siteid和dn

dws.dws_qz_paper: 4张表join qz_paperview left join qz_center join 条件:paperviewid和dn,

left join qz_center join 条件:centerid和dn, inner join qz_paper join条件:paperid和dn

dws.dws_qz_paper: 4张表join qz_paperview left join qz_center join 条件:paperviewid和dn,

left join qz_center join 条件:centerid和dn, inner join qz_paper join条件:paperid和dn

需求3:基于dws.dws_qz_chapter、dws.dws_qz_course、dws.dws_qz_major、dws.dws_qz_paper、dws.dws_qz_question、dwd.dwd_qz_member_paper_question 合成宽表dw.user_paper_detail,使用spark sql和dataframe api操作

dws.user_paper_detail:dwd_qz_member_paper_question inner join dws_qz_chapter join条件:chapterid 和dn ,inner join dws_qz_course join条件:sitecourseid和dn , inner join dws_qz_major join条件majorid和dn, inner join dws_qz_paper 条件paperviewid和dn , inner join dws_qz_question 条件questionid和dn

代码语言:javascript
复制
import com.catelf.qz.dao.{QzChapterDao, QzCourseDao, QzMajorDao, QzPaperDao, QzQuestionDao, UserPaperDetailDao}
import org.apache.spark.sql.{SaveMode, SparkSession}

object DwsQzService {

  def saveDwsQzChapter(sparkSession: SparkSession, dt: String) = {
    val dwdQzChapter = QzChapterDao.getDwdQzChapter(sparkSession, dt)
    val dwdQzChapterlist = QzChapterDao.getDwdQzChapterList(sparkSession, dt)
    val dwdQzPoint = QzChapterDao.getDwdQzPoint(sparkSession, dt)
    val dwdQzPointQuestion = QzChapterDao.getDwdQzPointQuestion(sparkSession, dt)
    val result = dwdQzChapter.join(dwdQzChapterlist, Seq("chapterlistid", "dn"))
      .join(dwdQzPoint, Seq("chapterid", "dn"))
      .join(dwdQzPointQuestion, Seq("pointid", "dn"))
    result.select("chapterid", "chapterlistid", "chaptername", "sequence", "showstatus", "status",
      "chapter_creator", "chapter_createtime", "chapter_courseid", "chapternum", "chapterallnum", "outchapterid", "chapterlistname",
      "pointid", "questionid", "questype", "pointname", "pointyear", "chapter", "excisenum", "pointlistid", "pointdescribe",
      "pointlevel", "typelist", "point_score", "thought", "remid", "pointnamelist", "typelistids", "pointlist", "dt", "dn")
      .coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_chapter")
  }


  def saveDwsQzCourse(sparkSession: SparkSession, dt: String) = {
    val dwdQzSiteCourse = QzCourseDao.getDwdQzSiteCourse(sparkSession, dt)
    val dwdQzCourse = QzCourseDao.getDwdQzCourse(sparkSession, dt)
    val dwdQzCourseEdusubject = QzCourseDao.getDwdQzCourseEduSubject(sparkSession, dt)
    val result = dwdQzSiteCourse.join(dwdQzCourse, Seq("courseid", "dn"))
      .join(dwdQzCourseEdusubject, Seq("courseid", "dn"))
      .select("sitecourseid", "siteid", "courseid", "sitecoursename", "coursechapter",
        "sequence", "status", "sitecourse_creator", "sitecourse_createtime", "helppaperstatus", "servertype", "boardid",
        "showstatus", "majorid", "coursename", "isadvc", "chapterlistid", "pointlistid", "courseeduid", "edusubjectid"
        , "dt", "dn")
    result.coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_course")
  }

  def saveDwsQzMajor(sparkSession: SparkSession, dt: String) = {
    val dwdQzMajor = QzMajorDao.getQzMajor(sparkSession, dt)
    val dwdQzWebsite = QzMajorDao.getQzWebsite(sparkSession, dt)
    val dwdQzBusiness = QzMajorDao.getQzBusiness(sparkSession, dt)
    val result = dwdQzMajor.join(dwdQzWebsite, Seq("siteid", "dn"))
      .join(dwdQzBusiness, Seq("businessid", "dn"))
      .select("majorid", "businessid", "siteid", "majorname", "shortname", "status", "sequence",
        "major_creator", "major_createtime", "businessname", "sitename", "domain", "multicastserver", "templateserver",
        "multicastgateway", "multicastport", "dt", "dn")
    result.coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_major")
  }

  def saveDwsQzPaper(sparkSession: SparkSession, dt: String) = {
    val dwdQzPaperView = QzPaperDao.getDwdQzPaperView(sparkSession, dt)
    val dwdQzCenterPaper = QzPaperDao.getDwdQzCenterPaper(sparkSession, dt)
    val dwdQzCenter = QzPaperDao.getDwdQzCenter(sparkSession, dt)
    val dwdQzPaper = QzPaperDao.getDwdQzPaper(sparkSession, dt)
    val result = dwdQzPaperView.join(dwdQzCenterPaper, Seq("paperviewid", "dn"), "left")
      .join(dwdQzCenter, Seq("centerid", "dn"), "left")
      .join(dwdQzPaper, Seq("paperid", "dn"))
      .select("paperviewid", "paperid", "paperviewname", "paperparam", "openstatus", "explainurl", "iscontest"
        , "contesttime", "conteststarttime", "contestendtime", "contesttimelimit", "dayiid", "status", "paper_view_creator",
        "paper_view_createtime", "paperviewcatid", "modifystatus", "description", "paperuse", "paperdifficult", "testreport",
        "paperuseshow", "centerid", "sequence", "centername", "centeryear", "centertype", "provideuser", "centerviewtype",
        "stage", "papercatid", "courseid", "paperyear", "suitnum", "papername", "totalscore", "chapterid", "chapterlistid",
        "dt", "dn")

    result.coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_paper")
  }

  def saveDwsQzQuestionTpe(sparkSession: SparkSession, dt: String) = {
    val dwdQzQuestion = QzQuestionDao.getQzQuestion(sparkSession, dt)
    val dwdQzQuestionType = QzQuestionDao.getQzQuestionType(sparkSession, dt)
    val result = dwdQzQuestion.join(dwdQzQuestionType, Seq("questypeid", "dn"))
      .select("questionid", "parentid", "questypeid", "quesviewtype", "content", "answer", "analysis"
        , "limitminute", "score", "splitscore", "status", "optnum", "lecture", "creator", "createtime", "modifystatus"
        , "attanswer", "questag", "vanalysisaddr", "difficulty", "quesskill", "vdeoaddr", "viewtypename", "papertypename",
        "remark", "splitscoretype", "dt", "dn")
    result.coalesce(1).write.mode(SaveMode.Append).insertInto("dws.dws_qz_question")
  }

  def saveDwsUserPaperDetail(sparkSession: SparkSession, dt: String) = {
    val dwdQzMemberPaperQuestion = UserPaperDetailDao.getDwdQzMemberPaperQuestion(sparkSession, dt).drop("paperid")
      .withColumnRenamed("question_answer", "user_question_answer")
    val dwsQzChapter = UserPaperDetailDao.getDwsQzChapter(sparkSession, dt).drop("courseid")
    val dwsQzCourse = UserPaperDetailDao.getDwsQzCourse(sparkSession, dt).withColumnRenamed("sitecourse_creator", "course_creator")
      .withColumnRenamed("sitecourse_createtime", "course_createtime").drop("majorid")
      .drop("chapterlistid").drop("pointlistid")
    val dwsQzMajor = UserPaperDetailDao.getDwsQzMajor(sparkSession, dt)
    val dwsQzPaper = UserPaperDetailDao.getDwsQzPaper(sparkSession, dt).drop("courseid")
    val dwsQzQuestion = UserPaperDetailDao.getDwsQzQuestion(sparkSession, dt)
    dwdQzMemberPaperQuestion.join(dwsQzCourse, Seq("sitecourseid", "dn")).
      join(dwsQzChapter, Seq("chapterid", "dn")).join(dwsQzMajor, Seq("majorid", "dn"))
      .join(dwsQzPaper, Seq("paperviewid", "dn")).join(dwsQzQuestion, Seq("questionid", "dn"))
      .select("userid", "courseid", "questionid", "useranswer", "istrue", "lasttime", "opertype",
        "paperid", "spendtime", "chapterid", "chaptername", "chapternum",
        "chapterallnum", "outchapterid", "chapterlistname", "pointid", "questype", "pointyear", "chapter", "pointname"
        , "excisenum", "pointdescribe", "pointlevel", "typelist", "point_score", "thought", "remid", "pointnamelist",
        "typelistids", "pointlist", "sitecourseid", "siteid", "sitecoursename", "coursechapter", "course_sequence", "course_status"
        , "course_creator", "course_createtime", "servertype", "helppaperstatus", "boardid", "showstatus", "majorid", "coursename",
        "isadvc", "chapterlistid", "pointlistid", "courseeduid", "edusubjectid", "businessid", "majorname", "shortname",
        "major_status", "major_sequence", "major_creator", "major_createtime", "businessname", "sitename",
        "domain", "multicastserver", "templateserver", "multicastgateway", "multicastport", "paperviewid", "paperviewname", "paperparam",
        "openstatus", "explainurl", "iscontest", "contesttime", "conteststarttime", "contestendtime", "contesttimelimit",
        "dayiid", "paper_status", "paper_view_creator", "paper_view_createtime", "paperviewcatid", "modifystatus", "description", "paperuse",
        "testreport", "centerid", "paper_sequence", "centername", "centeryear", "centertype", "provideuser", "centerviewtype",
        "paper_stage", "papercatid", "paperyear", "suitnum", "papername", "totalscore", "question_parentid", "questypeid",
        "quesviewtype", "question_content", "question_answer", "question_analysis", "question_limitminute", "score",
        "splitscore", "lecture", "question_creator", "question_createtime", "question_modifystatus", "question_attanswer",
        "question_questag", "question_vanalysisaddr", "question_difficulty", "quesskill", "vdeoaddr", "question_description",
        "question_splitscoretype", "user_question_answer", "dt", "dn").coalesce(1)
      .write.mode(SaveMode.Append).insertInto("dws.dws_user_paper_detail")
  }

创建DwsController

代码语言:javascript
复制
import com.catelf.qz.service.DwsQzService
import com.catelf.util.HiveUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object DwsController {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    val sparkConf = new SparkConf().setAppName("dws_qz_controller").setMaster("local[*]")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val ssc = sparkSession.sparkContext
    HiveUtil.openDynamicPartition(sparkSession) //开启动态分区
    HiveUtil.openCompression(sparkSession) //开启压缩
    HiveUtil.useSnappyCompression(sparkSession) //使用snappy压缩
    val dt = "20190722"
    DwsQzService.saveDwsQzChapter(sparkSession, dt)
    DwsQzService.saveDwsQzCourse(sparkSession, dt)
    DwsQzService.saveDwsQzMajor(sparkSession, dt)
    DwsQzService.saveDwsQzPaper(sparkSession, dt)
    DwsQzService.saveDwsQzQuestionTpe(sparkSession, dt)
    DwsQzService.saveDwsUserPaperDetail(sparkSession, dt)
  }
}

可以看到已经生成了dws表数据

报表层各指标统计

需求4:基于宽表统计各试卷平均耗时、平均分,先使用Spark Sql 完成指标统计,再使用Spark DataFrame Api。

需求5:统计各试卷最高分、最低分,先使用Spark Sql 完成指标统计,再使用Spark DataFrame Api。

需求6:按试卷分组统计每份试卷的前三用户详情,先使用Spark Sql 完成指标统计,再使用Spark DataFrame Api。

需求7:按试卷分组统计每份试卷的倒数前三的用户详情,先使用Spark Sql 完成指标统计,再使用Spark DataFrame Api。

需求8:统计各试卷各分段的用户id,分段有0-20,20-40,40-60,60-80,80-100

需求9:统计试卷未及格的人数,及格的人数,试卷的及格率 及格分数60

需求10:统计各题的错误数,正确数,错题率

创建AdsAzDao

代码语言:javascript
复制
import org.apache.spark.sql.SparkSession

object AdsQzDao {

  /**
   * 统计各试卷平均耗时 平均分
   *
   * @param sparkSession
   * @param dt
   * @return
   */
  def getAvgSPendTimeAndScore(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql(s"select paperviewid,paperviewname,cast(avg(score) as decimal(4,1)) score,cast(avg(spendtime) as decimal(10,2))" +
      s" spendtime,dt,dn from dws.dws_user_paper_detail where dt='$dt' group by " +
      "paperviewid,paperviewname,dt,dn order by score desc,spendtime desc");
  }

  /**
   * 统计试卷 最高分 最低分
   *
   * @param sparkSession
   * @param dt
   */
  def getTopScore(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select paperviewid,paperviewname,cast(max(score) as decimal(4,1)),cast(min(score) as decimal(4,1)) " +
      s",dt,dn from dws.dws_user_paper_detail where dt=$dt group by paperviewid,paperviewname,dt,dn ")
  }

  /**
   * 按试卷分组获取每份试卷的分数前三用户详情
   *
   * @param sparkSession
   * @param dt
   */
  def getTop3UserDetail(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select *from (select userid,paperviewname,chaptername,pointname,sitecoursename,coursename,majorname,shortname," +
      "sitename,papername,score,dense_rank() over (partition by paperviewid order by score desc) as rk,dt,dn from dws.dws_user_paper_detail) " +
      "where rk<4")
  }

  /**
   * 按试卷分组获取每份试卷的分数倒数三的用户详情
   *
   * @param sparkSession
   * @param dt
   * @return
   */
  def getLow3UserDetail(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select *from (select userid,paperviewname,chaptername,pointname,sitecoursename,coursename,majorname,shortname," +
      s"sitename,papername,score,dense_rank() over (partition by paperviewid order by score asc) as rk,dt,dn from dws.dws_user_paper_detail where dt='$dt') where rk<4")
  }


  /**
   * 统计各试卷 各分段学员名称
   */
  def getPaperScoreSegmentUser(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select paperviewid,paperviewname,score_segment,concat_ws(',',collect_list(cast(userid as string))),dt,dn" +
      " from (select paperviewid,paperviewname,userid," +
      " case  when score >=0  and score <=20 then '0-20'" +
      "       when score >20 and score <=40 then '20-40' " +
      "       when score >40 and score <=60 then '40-60' " +
      "       when score >60 and score <=80 then '60-80' " +
      "       when score >80 and score <=100 then '80-100' end  as score_segment" +
      s",dt,dn from  dws.dws_user_paper_detail where dt='$dt') group by paperviewid,paperviewname,score_segment,dt,dn order by paperviewid,score_segment")
  }

  /**
   * 统计各试卷未及格人数 及格人数 及格率
   *
   * @param sparkSession
   * @param dt
   */
  def getPaperPassDetail(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql("select t.*,cast(t.passcount/(t.passcount+t.countdetail) as decimal(4,2)) as rate,dt,dn" +
      "   from(select a.paperviewid,a.paperviewname,a.countdetail,a.dt,a.dn,b.passcount from " +
      s"(select paperviewid,paperviewname,count(*) countdetail,dt,dn from dws.dws_user_paper_detail where dt='$dt' and score between 0 and 60 group by" +
      s" paperviewid,paperviewname,dt,dn) a join (select paperviewid,count(*) passcount,dn from  dws.dws_user_paper_detail  where dt='$dt' and score >60  " +
      "group by paperviewid,dn) b on a.paperviewid=b.paperviewid and a.dn=b.dn)t")

  }

  /**
   * 统计各题 正确人数 错误人数 错题率 top3错误题数多的questionid
   *
   * @param sparkSession
   * @param dt
   */
  def getQuestionDetail(sparkSession: SparkSession, dt: String) = {
    sparkSession.sql(s"select t.*,cast(t.errcount/(t.errcount+t.rightcount) as decimal(4,2))as rate" +
      s" from((select questionid,count(*) errcount,dt,dn from dws.dws_user_paper_detail where dt='$dt' and user_question_answer='0' " +
      s"group by questionid,dt,dn) a join(select questionid,count(*) rightcount,dt,dn from dws.dws_user_paper_detail where dt='$dt' and user_question_answer='1' " +
      s"group by questionid,dt,dn) b on a.questionid=b.questionid and a.dn=b.dn)t order by errcount desc")
  }

}

创建AdsQzService

代码语言:javascript
复制
import com.catelf.qz.dao.AdsQzDao
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{SaveMode, SparkSession}

object AdsQzService {

  def getTarget(sparkSession: SparkSession, dt: String) = {
    val avgDetail = AdsQzDao.getAvgSPendTimeAndScore(sparkSession, dt)
    val topscore = AdsQzDao.getTopScore(sparkSession, dt)
    val top3UserDetail = AdsQzDao.getTop3UserDetail(sparkSession, dt)
    val low3UserDetail = AdsQzDao.getLow3UserDetail(sparkSession, dt)
    val paperScore = AdsQzDao.getPaperScoreSegmentUser(sparkSession, dt)
    val paperPassDetail = AdsQzDao.getPaperPassDetail(sparkSession, dt)
    val questionDetail = AdsQzDao.getQuestionDetail(sparkSession, dt)
  }


  def getTargetApi(sparkSession: SparkSession, dt: String) = {
    import org.apache.spark.sql.functions._
    val avgDetail = sparkSession.sql("select paperviewid,paperviewname,score,spendtime,dt,dn from dws.dws_user_paper_detail ")
      .where(s"dt=${dt}").groupBy("paperviewid", "paperviewname", "dt", "dn").
      agg(avg("score").cast("decimal(4,1)").as("avgscore"),
        avg("spendtime").cast("decimal(10,1)").as("avgspendtime"))
      .select("paperviewid", "paperviewname", "avgscore", "avgspendtime", "dt", "dn")
      .coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_paper_avgtimeandscore")

    val topscore = sparkSession.sql("select paperviewid,paperviewname,score,dt,dn from dws.dws_user_paper_detail")
      .where(s"dt=$dt").groupBy("paperviewid", "paperviewname", "dt", "dn")
      .agg(max("score").as("maxscore"), min("score").as("minscore"))
      .select("paperviewid", "paperviewname", "maxscore", "minscore", "dt", "dn")
      .coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_paper_maxdetail")

    val top3UserDetail = sparkSession.sql("select *from dws.dws_user_paper_detail")
      .where(s"dt=$dt").select("userid", "paperviewid", "paperviewname", "chaptername", "pointname"
      , "sitecoursename", "coursename", "majorname", "shortname", "papername", "score", "dt", "dn")
      .withColumn("rk", dense_rank().over(Window.partitionBy("paperviewid").orderBy(desc("score"))))
      .where("rk<4")
      .select("userid", "paperviewid", "paperviewname", "chaptername", "pointname", "sitecoursename"
        , "coursename", "majorname", "shortname", "papername", "score", "rk", "dt", "dn")
      .coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_top3_userdetail")

    val low3UserDetail = sparkSession.sql("select *from dws.dws_user_paper_detail")
      .where(s"dt=$dt").select("userid", "paperviewid", "paperviewname", "chaptername", "pointname"
      , "sitecoursename", "coursename", "majorname", "shortname", "papername", "score", "dt", "dn")
      .withColumn("rk", dense_rank().over(Window.partitionBy("paperviewid").orderBy("score")))
      .where("rk<4")
      .select("userid", "paperviewid", "paperviewname", "chaptername", "pointname", "sitecoursename"
        , "coursename", "majorname", "shortname", "papername", "score", "rk", "dt", "dn")
      .coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_low3_userdetail")

    val paperScore = sparkSession.sql("select *from dws.dws_user_paper_detail")
      .where(s"dt=$dt")
      .select("paperviewid", "paperviewname", "userid", "score", "dt", "dn")
      .withColumn("score_segment",
        when(col("score").between(0, 20), "0-20")
          .when(col("score") > 20 && col("score") <= 40, "20-40")
          .when(col("score") > 40 && col("score") <= 60, "40-60")
          .when(col("score") > 60 && col("score") <= 80, "60-80")
          .when(col("score") > 80 && col("score") <= 100, "80-100"))
      .drop("score").groupBy("paperviewid", "paperviewname", "score_segment", "dt", "dn")
      .agg(concat_ws(",", collect_list(col("userid").cast("string").as("userids"))).as("userids"))
      .select("paperviewid", "paperviewname", "score_segment", "userids", "dt", "dn")
      .orderBy("paperviewid", "score_segment")
      .coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_paper_scoresegment_user")

    val paperPassDetail = sparkSession.sql("select * from dws.dws_user_paper_detail").cache()
    val unPassDetail = paperPassDetail.select("paperviewid", "paperviewname", "dn", "dt")
      .where(s"dt='$dt'").where("score between 0 and 60")
      .groupBy("paperviewid", "paperviewname", "dn", "dt")
      .agg(count("paperviewid").as("unpasscount"))

    val passDetail = paperPassDetail.select("paperviewid", "dn")
      .where(s"dt='$dt'").where("score >60")
      .groupBy("paperviewid", "dn")
      .agg(count("paperviewid").as("passcount"))

    unPassDetail.join(passDetail, Seq("paperviewid", "dn")).
      withColumn("rate", (col("passcount")./(col("passcount") + col("unpasscount")))
        .cast("decimal(4,2)"))
      .select("paperviewid", "paperviewname", "unpasscount", "passcount", "rate", "dt", "dn")
      .coalesce(1).write.mode(SaveMode.Append).insertInto("ads.ads_user_paper_detail")
    paperPassDetail.unpersist()

    val userQuestionDetail = sparkSession.sql("select * from dws.dws_user_paper_detail").cache()
    val userQuestionError = userQuestionDetail.select("questionid", "dt", "dn", "user_question_answer")
      .where(s"dt='$dt'").where("user_question_answer='0'").drop("user_question_answer")
      .groupBy("questionid", "dt", "dn")
      .agg(count("questionid").as("errcount"))

    val userQuestionRight = userQuestionDetail.select("questionid", "dn", "user_question_answer")
      .where(s"dt='$dt'").where("user_question_answer='1'").drop("user_question_answer")
      .groupBy("questionid", "dn")
      .agg(count("questionid").as("rightcount"))

    userQuestionError.join(userQuestionRight, Seq("questionid", "dn"))
      .withColumn("rate", (col("errcount") / (col("errcount") + col("rightcount"))).cast("decimal(4,2)"))
      .orderBy(desc("errcount")).coalesce(1)
      .select("questionid", "errcount", "rightcount", "rate", "dt", "dn")
      .write.mode(SaveMode.Append).insertInto("ads.ads_user_question_detail")
  }

}

创建AdsController

代码语言:javascript
复制
import com.catelf.qz.service.AdsQzService
import com.catelf.util.HiveUtil
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object AdsController {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    val sparkConf = new SparkConf().setAppName("ads_controller").setMaster("local[*]")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    val ssc = sparkSession.sparkContext
    HiveUtil.openDynamicPartition(sparkSession) //开启动态分区
    AdsQzService.getTargetApi(sparkSession, "20190722")
//    AdsQzService.getTarget(sparkSession, dt="20190722")
  }
}

将数据导入MySQL

最后将统计指标用DataX导入MySQL中

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-12-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据湖 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 模拟数据采集上传数据
  • 建表
  • 解析数据
    • 创建QzChapterDao 章节表dao类
      • 创建QzCourseDao 课程表dao类
        • 创建QzMajorDao 主修表dao类
          • 创建QzPaperDao 试卷dao类
            • 创建QzQuestionDao 做题dao类
              • 创建 UserPaperDetailDao 宽表dao类
              • 维度退化、合成宽表 业务类
                • 创建DwsController
                • 报表层各指标统计
                  • 创建AdsAzDao
                    • 创建AdsQzService
                      • 创建AdsController
                      • 将数据导入MySQL
                      相关产品与服务
                      云数据库 SQL Server
                      腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档