前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据Kudu(九):Spark操作Kudu

大数据Kudu(九):Spark操作Kudu

原创
作者头像
Lansonli
发布2022-12-21 08:51:54
1.1K0
发布2022-12-21 08:51:54
举报
文章被收录于专栏:Lansonli技术博客

Spark操作Kudu

一、​​​​​​​​​​​​​​添加Maven依赖

使用SparkSQL操作Kudu,这里需要导入Kudu与SparkSQL整合的包和SparkSQL的包,在Maven中导入如下依赖:

代码语言:javascript
复制
<!--添加kudu-spark 依赖-->
<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-spark2_2.11</artifactId>
    <version>1.10.0-cdh6.3.2</version>
</dependency>

<!-- Spark SQL -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.3.1</version>
</dependency>

​​​​​​​​​​​​​​二、KuduContext创建表

KuduContext创建Kudu表,与Java api 操作Kudu类似,经过以下步骤:

  • 创建SparkSession对象
  • 创建SparkContext对象
  • 创建KuduContext对象
  • 创建Kudu表

代码如下:

代码语言:javascript
复制
val session: SparkSession = SparkSession
  .builder()
  .master("local")
  .appName("create_kudu_table")
  .getOrCreate()

//获取SparkContext
val sc: SparkContext = session.sparkContext

//构建KuduContext对象
val kuduContext = new  KuduContext("cm1:7051,cm2:7051",sc)

/**
  * 创建Kudu表
  */

//设置表名
val KUDU_TABLE_NAME="t_spark_kudu"

//设置表Schema信息
val schema = StructType(Array[StructField](
  StructField("id",IntegerType,false),
  StructField("name",StringType,false),
  StructField("age",IntegerType,false),
  StructField("score",DoubleType,false)
))

//指定Kudu表的Option信息 ,设置分区信息
val options = new CreateTableOptions()
options.addHashPartitions(util.Arrays.asList("id"),10)

//创建Kudu表 ,参数:表名,表Schema信息,指定主键,设置表分区选项
if(!kuduContext.tableExists(KUDU_TABLE_NAME)){
  kuduContext.createTable(KUDU_TABLE_NAME,schema,Seq[String]("id"),options)
}

经过以上操作,可以在Kudu WebUI中查看到对应的表:

三、​​​​​​​​​​​​​​KuduContext CRUD-增删改查数据

代码语言:javascript
复制
case class PersonInfo(id:Int,name:String,age:Int,score:Double)

object SparkSQLCRUDToKudu {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("create_kudu_table")
      .getOrCreate()

    //获取SparkContext
    val sc: SparkContext = session.sparkContext
    sc.setLogLevel("Error")

    //构建KuduContext对象
    val kuduContext = new  KuduContext("cm1:7051,cm2:7051",sc)

    //Kudu表
    val KUDU_TABLE = "t_spark_kudu"

    /**
      *  向表中插入数据
      */
//    insertData(session,kuduContext,KUDU_TABLE)

    /**
      * 查询Kudu表数据
      */
//    queryData(kuduContext,sc,KUDU_TABLE)

    /**
      * 向Kudu表更新数据
      */
//    updateData(session,kuduContext,KUDU_TABLE)

    /**
      * 删除Kudu表中的数据
      */
    deleteData(session,kuduContext,KUDU_TABLE)
    queryData(kuduContext,sc,KUDU_TABLE)

    session.stop()

  }

  def insertData(session:SparkSession,kuduContext:KuduContext,tbl:String) = {
    //准备DataFrame
    val personList = List[PersonInfo](
      PersonInfo(1,"zhangsan",18,100),
      PersonInfo(2,"zhangsan",19,200),
      PersonInfo(3,"zhangsan",20,300)
    )
    import session.implicits._
    val df = personList.toDF()
    //向Kudu表 t_spark_kudu中插入数据
    kuduContext.insertRows(df,tbl)
  }


  def queryData(kuduContext: KuduContext,sc:SparkContext,tbl:String)={
    //查询kudu表中的数据,加载RDD
    val rdd: RDD[Row] = kuduContext.kuduRDD(sc,tbl,Seq[String]("id","name","age","score"))
     rdd.foreach(println)
  }

  def updateData(session:SparkSession,kuduContext: KuduContext,tbl:String)={
    val list = List[PersonInfo](
      PersonInfo(100,"tianqi",30,400)
    )
    import session.implicits._
    val updateDF = list.toDF()

    //更新数据,主键不存在就报错,主键存在就更新
//    kuduContext.updateRows(updateDF,tbl)

    //更新数据,主键不存就直接插入,主键存在就更新
    kuduContext.upsertRows(updateDF,tbl)
  }


  def deleteData(session:SparkSession,kuduContext: KuduContext,tbl:String)={
    val list = List[PersonInfo](
      PersonInfo(1,"zhangsan",18,100),
      PersonInfo(2,"zhangsan",19,200),
      PersonInfo(3,"zhangsan",20,300)
    )
    import session.implicits._
    //删除Kudu表中的数据,需要只传入主键列
    val deleteKeysDF = list.toDF().select("id")
    kuduContext.deleteRows(deleteKeysDF,tbl)
  }
}

四、​​​​​​​​​​​​​​SparkSQL 操作Kudu表

代码语言:javascript
复制
val session: SparkSession = SparkSession
  .builder()
  .master("local")
  .appName("create_kudu_table")
  .getOrCreate()

//SparkSQL 读取Kudu表中的数据
val kuduOptionMap = Map[String,String](
  "kudu.master" -> "cm1:7051,cm2:7051",
  "kudu.table" ->"t_spark_kudu"
)

//frame注册表操作
frame.createTempView("tmp")
session.sql(
  """
    |select count(*) from tmp
  """.stripMargin).show()

//加载数据
val frame: DataFrame = session.read.options(kuduOptionMap).format("kudu").load()
frame.show()

//准备插入到Kudu表的 DataFrame 数据,如果主键存在,在Kudu中就会被替换
val list = List[PersonInfo](
  PersonInfo(10,"a",20,100),
  PersonInfo(11,"a",21,101),
  PersonInfo(12,"a",22,102)
)
import session.implicits._
val resultDF: DataFrame = list.toDF()

//将DataFrame结果保存到Kudu表中,目前仅支持Append模式
resultDF.write.options(kuduOptionMap).mode(SaveMode.Append).format("kudu").save()

//再次查询Kudu表 t_spark_kudu 数据
session.read.options(kuduOptionMap).format("kudu").load().show()

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ​Spark操作Kudu
    • 一、​​​​​​​​​​​​​​添加Maven依赖
      • ​​​​​​​​​​​​​​二、KuduContext创建表
        • 三、​​​​​​​​​​​​​​KuduContext CRUD-增删改查数据
          • 四、​​​​​​​​​​​​​​SparkSQL 操作Kudu表
          相关产品与服务
          数据库
          云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档