前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >0856-7.1.4-如何使用spark-shell操作Kudu表

0856-7.1.4-如何使用spark-shell操作Kudu表

作者头像
Fayson
发布2021-07-28 15:30:19
1.3K0
发布2021-07-28 15:30:19
举报
文章被收录于专栏:Hadoop实操

1.文档编写目的

Kudu从 1.0.0 版本开始通过Data Source API与Spark 集成。kudu-spark使用--packages选项包含依赖项。如果将Spark与Scala 2.10 一起使用,需要使用 kudu-spark_2.10 。从 Kudu1.6.0开始不再支持Spark 1,如果要使用Spark1与Kudu集成,最高只能到Kudu1.5.0。

如果将 Spark 1 与 Scala 2.10 一起使用,请使用 kudu-spark_2.10:1.5.0 依赖包。

代码语言:javascript
复制
spark-shell --packages org.apache.kudu:kudu-spark_2.10:1.5.0-cdh5.13.91 --repositories https://repository.cloudera.com/artifactory/cloudera-repos/

如果将 Spark 2 与 Scala 2.11 一起使用,请使用 kudu-spark2_2.11 依赖包(当前CDP版本中可用)。

代码语言:javascript
复制
spark2-shell --packages org.apache.kudu:kudu-spark2_2.11:1.9.0-cdh6.2.0 --repositories https://repository.cloudera.com/artifactory/cloudera-repos/

本文主要讲述在CDP7.1.4中如何通过spark-shell对kudu表的进行操作。

  • 测试环境

1.CDP7.1.4 、启用Kerberos、Kudu 1.13.0、Spark 2.4.0

2.操作步骤

2.1 建kudu表

代码语言:javascript
复制
CREATE EXTERNAL TABLE test002 ( name STRING NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, age INT, PRIMARY KEY (name) ) STORED AS KUDU TBLPROPERTIES ('external.table.purge'='TRUE', 'kudu.master_addresses'='cdp03:7051', 'numFiles'='0', 'numFilesErasureCoded'='0', 'totalSize'='0');

2.2 添加依赖jar包

通过本地的方式添加依赖,首先到下面地址中

代码语言:javascript
复制
https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/kudu/kudu-spark2_2.11/1.9.0-cdh6.2.0/

下载jar包,然后放在集群的一个节点上的/var/www/html/下面,通过本地的http方式加载

进行验证

2.3 进入spark-shell操作kudu

作为 CML 中现有引擎的替代品,ML Runtimes 比当前的单体引

代码语言:javascript
复制
spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.9.0-cdh6.2.0 --repositories https://repository.cloudera.com/artifactory/cloudera-repos/

2.3.1 单行写

在spark-shell中执行如下代码

代码语言:javascript
复制
import org.apache.hadoop.security.UserGroupInformation
import java.security.PrivilegedExceptionAction
import org.apache.kudu.client.KuduClient

val kuduClient = UserGroupInformation.getLoginUser.doAs(new PrivilegedExceptionAction[KuduClient]() {
  @throws[Exception]
  override def run: KuduClient = new KuduClient.KuduClientBuilder("cdp01:7051,cdp02:7051,cdp03:7051").build;
})

val tableList=kuduClient.getTablesList()
val kuduTable = kuduClient.openTable("impala::default.test002")
val session = kuduClient.newSession()
session.setTimeoutMillis(60000)
val insert = kuduTable.newInsert()
val row = insert.getRow()
row.addString(0,"zhangsan")
row.addInt(1,30)
print(row)
print(kuduTable.getName())
session.apply(insert)

在impala-shell中去查询test001

可见插入单条数据插入成功

2.3.2 单行读

在spark-shell中执行如下代码

代码语言:javascript
复制
import org.apache.hadoop.security.UserGroupInformation
import java.security.PrivilegedExceptionAction
import org.apache.kudu.client.KuduClient

val kuduClient = UserGroupInformation.getLoginUser.doAs(new PrivilegedExceptionAction[KuduClient]() {
  @throws[Exception]
  override def run: KuduClient = new KuduClient.KuduClientBuilder("cdp01:7051,cdp02:7051,cdp03:7051").build;
})

val kuduTable = kuduClient.openTable("impala::default.test002")
val scanner = kuduClient.newScannerBuilder(kuduTable).build()
scanner.hasMoreRows
val rows = scanner.nextRows
rows.hasNext
val row = rows.next
println(row.getString(0))
println(row.getInt(1))

可看到读出一条数据

2.3.3 批量操作

先插入一条数据

代码语言:javascript
复制
val kuduTable = kuduClient.openTable("impala::default.test002")
val session = kuduClient.newSession()
session.setTimeoutMillis(60000)
val insert = kuduTable.newInsert()
val row = insert.getRow()
row.addString(0,"lisi")
row.addInt(1,18)
session.apply(insert)

2.3.3.1 批量读

在spark-shell下执行下面代码

代码语言:javascript
复制
import org.apache.kudu.spark.kudu._
val df = spark.read.format("kudu").options(Map("kudu.master" -> "cdp03:7051", "kudu.table" -> "impala::default.test002")).load
df.createOrReplaceTempView("tmp_table")
spark.sql("select * from tmp_table ").show()

可看到整张表查询成功

2.3.3.2 批量写

根据一个DataFrameschema创建一个kudu表,并查看是否存在

代码语言:javascript
复制
import org.apache.kudu.client._
import org.apache.kudu.spark.kudu.KuduContext
import collection.JavaConverters._

val df = spark.read.format("kudu").options(Map("kudu.master" -> "cdp03:7051", "kudu.table" -> "impala::default.test002")).load
val kuduContext = new KuduContext("cdp03:7051", spark.sparkContext)
kuduContext.createTable(
    "like_test002", df.schema, Seq("name"),
    new CreateTableOptions()
        .setNumReplicas(1)
.addHashPartitions(List("name").asJava, 3))
kuduContext.tableExists("like_test002")

根据一个DataFrame schema创建一个kudu表,并查看是否存在可以看到创建成功

可以先查询一下这张表的数据

代码语言:javascript
复制
val dftmp = spark.read.format("kudu").options(Map("kudu.master" -> "cdp03:7051", "kudu.table" -> "like_test002")).load
dftmp.createOrReplaceTempView("tmp_table")
spark.sql("select * from tmp_table").show()

可看到没有任何数据

然后将从test002表生成的df插入到表like_test002,执行下面代码

代码语言:javascript
复制
kuduContext.insertRows(df, "like_test002")

并且再次查询发现数据已经插入成功

2.3.3.3 批量更改数据

代码语言:javascript
复制
val updateDF = df.select($"name", ($"age" + 100).as("age"))
kuduContext.updateRows(updateDF, "like_test002")

发现数据全部被更改

2.3.3.4 批量修改和新增数据

1.再test002插入一条数据

代码语言:javascript
复制
val kuduTable = kuduClient.openTable("impala::default.test002")
val session = kuduClient.newSession()
session.setTimeoutMillis(60000)
val insert = kuduTable.newInsert()
val row = insert.getRow()
row.addString(0,"wangwu")
row.addInt(1,40)
session.apply(insert)

2.查看数据

代码语言:javascript
复制
val df = spark.read.format("kudu").options(Map("kudu.master" -> "cdp03:7051", "kudu.table" -> "impala::default.test002")).load
df.createOrReplaceTempView("tmp_table")
spark.sql("select * from tmp_table ").show()

也可以看到新增的一条数据插入成功

3.在spark-shell中执行下面代码

代码语言:javascript
复制
kuduContext.upsertRows(df, "like_test002")

可以看到新增了一条,也更改了数据

2.3.3.5 追加数据

1.先增加一条数据

代码语言:javascript
复制
val kuduTable = kuduClient.openTable("impala::default.test002")
val session = kuduClient.newSession()
session.setTimeoutMillis(60000)
val insert = kuduTable.newInsert()
val row = insert.getRow()
row.addString(0,"xiaoming")
row.addInt(1,50)
session.apply(insert)

2.在spark-shell中执行下面代码

代码语言:javascript
复制
df.write.options(Map("kudu.master"-> "cdp03:7051", "kudu.table"-> "like_test002")).mode("append").format("kudu").save

查询like_test002多了一条数据

2.3.3.6 删除全表数据

代码语言:javascript
复制
val df = spark.read.format("kudu").options(Map("kudu.master" -> "cdp03:7051", "kudu.table" -> "impala::default.test002")).load
df.createOrReplaceTempView("tmp_table")
val nameDF = spark.sql("select name from tmp_table ")
kuduContext.deleteRows(nameDF, "like_test002")

可以看到数据全部被删除

注意:直接用kuduContext.deleteRows(df, "like_test002") 会报下面的错

2.3.3.7 删表

代码语言:javascript
复制
kuduContext.deleteTable("like_test002")
kuduContext.tableExists("like_test002")

表已经不存在了

3.常见问题和优化

使用Spark程序访问Kudu 时应考虑如下问题:

  • 尽管 Kudu Spark 2.x 集成与 Java 7 兼容,但 Spark 2.2(及更高版本)在运行时需要 Java 8。Spark 2.2 是 Kudu 1.5.0 的默认依赖版本。
  • 名称包含大写或非 ASCII 字符的 Kudu 表在注册为临时表时必须指定一个备用名称。
  • 列名包含大写或非 ASCII 字符的 Kudu 表不得与 SparkSQL 一起使用。可以在 Kudu 中重命名列以解决此问题。
  • 部分查询语法支持问题,如 <>符号和OR谓词不会推送到 Kudu,而是由Spark任务评估,只有LIKE 带有后缀通配符的谓词才会被推送到 Kudu。例如 LIKE "FOO%"可以查询,但LIKE "FOO%BAR" 则不能。
  • Kudu 并不支持 Spark SQL 支持的所有类型。例如,不支持Date类型。
  • Kudu 表只能在 SparkSQL 中注册为临时表。
  • 无法使用HiveContext查询Kudu表。
  • 常见的Kudu-Spark 程序错误是实例化多余的KuduClient对象,在Kudu-Spark程序中, KuduClient归KuduContext所有。Spark应用程序代码不应创建另一个KuduClient连接到同一集群。应用程序代码应用KuduContext来访问 KuduClient 来使用KuduContext#syncClient。
  • 通常,Spark作业用最少的调整和配置运行。可以使用Spark 的配置选项调整执行程序和资源的数量,以提高并行度和性能。如果表非常宽并且默认内存分配相当低,可能导致作业失败。要解决此问题,需要增加Spark程序内存。通常的做法是每50列1GiB。如果Spark资源远超过 Kudu 集群,在kudu 集群进行数据恢复时需要限制并发发任务数,避免Kudu 集群压力过大。

参考文档:

代码语言:javascript
复制
https://docs.cloudera.com/cdp-private-cloud-base/7.1.6/kudu-development/topics/kudu-integration-with-spark.html
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-07-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Hadoop实操 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.文档编写目的
  • 2.操作步骤
  • 3.常见问题和优化
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档