Kudu从 1.0.0 版本开始通过Data Source API与Spark 集成。kudu-spark使用--packages选项包含依赖项。如果将Spark与Scala 2.10 一起使用,需要使用 kudu-spark_2.10 。从 Kudu1.6.0开始不再支持Spark 1,如果要使用Spark1与Kudu集成,最高只能到Kudu1.5.0。
如果将 Spark 1 与 Scala 2.10 一起使用,请使用 kudu-spark_2.10:1.5.0 依赖包。
spark-shell --packages org.apache.kudu:kudu-spark_2.10:1.5.0-cdh5.13.91 --repositories https://repository.cloudera.com/artifactory/cloudera-repos/
如果将 Spark 2 与 Scala 2.11 一起使用,请使用 kudu-spark2_2.11 依赖包(当前CDP版本中可用)。
spark2-shell --packages org.apache.kudu:kudu-spark2_2.11:1.9.0-cdh6.2.0 --repositories https://repository.cloudera.com/artifactory/cloudera-repos/
本文主要讲述在CDP7.1.4中如何通过spark-shell对kudu表的进行操作。
1.CDP7.1.4 、启用Kerberos、Kudu 1.13.0、Spark 2.4.0
2.1 建kudu表
CREATE EXTERNAL TABLE test002 ( name STRING NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, age INT, PRIMARY KEY (name) ) STORED AS KUDU TBLPROPERTIES ('external.table.purge'='TRUE', 'kudu.master_addresses'='cdp03:7051', 'numFiles'='0', 'numFilesErasureCoded'='0', 'totalSize'='0');
2.2 添加依赖jar包
通过本地的方式添加依赖,首先到下面地址中
https://repository.cloudera.com/artifactory/cloudera-repos/org/apache/kudu/kudu-spark2_2.11/1.9.0-cdh6.2.0/
下载jar包,然后放在集群的一个节点上的/var/www/html/下面,通过本地的http方式加载
进行验证
2.3 进入spark-shell操作kudu
作为 CML 中现有引擎的替代品,ML Runtimes 比当前的单体引
spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.9.0-cdh6.2.0 --repositories https://repository.cloudera.com/artifactory/cloudera-repos/
2.3.1 单行写
在spark-shell中执行如下代码
import org.apache.hadoop.security.UserGroupInformation
import java.security.PrivilegedExceptionAction
import org.apache.kudu.client.KuduClient
val kuduClient = UserGroupInformation.getLoginUser.doAs(new PrivilegedExceptionAction[KuduClient]() {
@throws[Exception]
override def run: KuduClient = new KuduClient.KuduClientBuilder("cdp01:7051,cdp02:7051,cdp03:7051").build;
})
val tableList=kuduClient.getTablesList()
val kuduTable = kuduClient.openTable("impala::default.test002")
val session = kuduClient.newSession()
session.setTimeoutMillis(60000)
val insert = kuduTable.newInsert()
val row = insert.getRow()
row.addString(0,"zhangsan")
row.addInt(1,30)
print(row)
print(kuduTable.getName())
session.apply(insert)
在impala-shell中去查询test001
可见插入单条数据插入成功
2.3.2 单行读
在spark-shell中执行如下代码
import org.apache.hadoop.security.UserGroupInformation
import java.security.PrivilegedExceptionAction
import org.apache.kudu.client.KuduClient
val kuduClient = UserGroupInformation.getLoginUser.doAs(new PrivilegedExceptionAction[KuduClient]() {
@throws[Exception]
override def run: KuduClient = new KuduClient.KuduClientBuilder("cdp01:7051,cdp02:7051,cdp03:7051").build;
})
val kuduTable = kuduClient.openTable("impala::default.test002")
val scanner = kuduClient.newScannerBuilder(kuduTable).build()
scanner.hasMoreRows
val rows = scanner.nextRows
rows.hasNext
val row = rows.next
println(row.getString(0))
println(row.getInt(1))
可看到读出一条数据
2.3.3 批量操作
先插入一条数据
val kuduTable = kuduClient.openTable("impala::default.test002")
val session = kuduClient.newSession()
session.setTimeoutMillis(60000)
val insert = kuduTable.newInsert()
val row = insert.getRow()
row.addString(0,"lisi")
row.addInt(1,18)
session.apply(insert)
2.3.3.1 批量读
在spark-shell下执行下面代码
import org.apache.kudu.spark.kudu._
val df = spark.read.format("kudu").options(Map("kudu.master" -> "cdp03:7051", "kudu.table" -> "impala::default.test002")).load
df.createOrReplaceTempView("tmp_table")
spark.sql("select * from tmp_table ").show()
可看到整张表查询成功
2.3.3.2 批量写
根据一个DataFrameschema创建一个kudu表,并查看是否存在
import org.apache.kudu.client._
import org.apache.kudu.spark.kudu.KuduContext
import collection.JavaConverters._
val df = spark.read.format("kudu").options(Map("kudu.master" -> "cdp03:7051", "kudu.table" -> "impala::default.test002")).load
val kuduContext = new KuduContext("cdp03:7051", spark.sparkContext)
kuduContext.createTable(
"like_test002", df.schema, Seq("name"),
new CreateTableOptions()
.setNumReplicas(1)
.addHashPartitions(List("name").asJava, 3))
kuduContext.tableExists("like_test002")
根据一个DataFrame schema创建一个kudu表,并查看是否存在可以看到创建成功
可以先查询一下这张表的数据
val dftmp = spark.read.format("kudu").options(Map("kudu.master" -> "cdp03:7051", "kudu.table" -> "like_test002")).load
dftmp.createOrReplaceTempView("tmp_table")
spark.sql("select * from tmp_table").show()
可看到没有任何数据
然后将从test002表生成的df插入到表like_test002,执行下面代码
kuduContext.insertRows(df, "like_test002")
并且再次查询发现数据已经插入成功
2.3.3.3 批量更改数据
val updateDF = df.select($"name", ($"age" + 100).as("age"))
kuduContext.updateRows(updateDF, "like_test002")
发现数据全部被更改
2.3.3.4 批量修改和新增数据
1.再test002插入一条数据
val kuduTable = kuduClient.openTable("impala::default.test002")
val session = kuduClient.newSession()
session.setTimeoutMillis(60000)
val insert = kuduTable.newInsert()
val row = insert.getRow()
row.addString(0,"wangwu")
row.addInt(1,40)
session.apply(insert)
2.查看数据
val df = spark.read.format("kudu").options(Map("kudu.master" -> "cdp03:7051", "kudu.table" -> "impala::default.test002")).load
df.createOrReplaceTempView("tmp_table")
spark.sql("select * from tmp_table ").show()
也可以看到新增的一条数据插入成功
3.在spark-shell中执行下面代码
kuduContext.upsertRows(df, "like_test002")
可以看到新增了一条,也更改了数据
2.3.3.5 追加数据
1.先增加一条数据
val kuduTable = kuduClient.openTable("impala::default.test002")
val session = kuduClient.newSession()
session.setTimeoutMillis(60000)
val insert = kuduTable.newInsert()
val row = insert.getRow()
row.addString(0,"xiaoming")
row.addInt(1,50)
session.apply(insert)
2.在spark-shell中执行下面代码
df.write.options(Map("kudu.master"-> "cdp03:7051", "kudu.table"-> "like_test002")).mode("append").format("kudu").save
查询like_test002多了一条数据
2.3.3.6 删除全表数据
val df = spark.read.format("kudu").options(Map("kudu.master" -> "cdp03:7051", "kudu.table" -> "impala::default.test002")).load
df.createOrReplaceTempView("tmp_table")
val nameDF = spark.sql("select name from tmp_table ")
kuduContext.deleteRows(nameDF, "like_test002")
可以看到数据全部被删除
注意:直接用kuduContext.deleteRows(df, "like_test002") 会报下面的错
2.3.3.7 删表
kuduContext.deleteTable("like_test002")
kuduContext.tableExists("like_test002")
表已经不存在了
使用Spark程序访问Kudu 时应考虑如下问题:
参考文档:
https://docs.cloudera.com/cdp-private-cloud-base/7.1.6/kudu-development/topics/kudu-integration-with-spark.html