前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >Scala 操作 HBase2.0 数据库

Scala 操作 HBase2.0 数据库

原创
作者头像
ZHANGHAO
修改2019-03-06 10:46:45
修改2019-03-06 10:46:45
3.2K10
代码可运行
举报
文章被收录于专栏:张浩的专栏张浩的专栏
运行总次数:0
代码可运行

环境配置

Maven添加hbase-client的依赖

代码语言:txt
复制
  <!--HBase Client-->
    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.1.2</version>
        </dependency>
    </dependencies>

Scala操作HBase

创建HBase的配置、Connection、Admin

代码语言:txt
复制
  /*
  *创建一个HBase的配置,创建的时候会去加载classpath下的hbase-default.xml和hbase-site.xml两个配置文件
  */
  private val conf = HBaseConfiguration.create()
  //设置Zookeeper的地址和端口来访问HBase,先从配置中读取,如配置中不存在,设置地址为localhost,端口为默认端口2181
  conf.set(HConstants.ZOOKEEPER_QUORUM, conf.get(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST))
  conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, conf.get(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT.toString))

  //创建操作HBase的入口connection
  private val conn: Connection = ConnectionFactory.createConnection(conf)
  //创建操作HBase表的入口Admin
  private val admin: Admin = conn.getAdmin

获取表

代码语言:txt
复制
 /**
    * 获取表
    *
    * @param tableName 表名
    * @return HBase表
    */

  def getTable(tableName: String): Table = {
    val table = Try(conn.getTable(TableName.valueOf(tableName)))
    table.get.close()
    table match {
      case Success(v) => v;
      case Failure(e) => e.printStackTrace()
        null
    }
  }

创建表

代码语言:txt
复制
/**
    * 创建表
    *
    * @param tableName 表名
    * @param cf        列族
    */
  def createTable(tableName: String, cf: String): Unit = {
    //创建表
    val tableDesc = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
    tableDesc.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder("basic".getBytes).build())
    println(s"Creating table `$tableName`. ")
    Try {
      if (admin.tableExists(TableName.valueOf(tableName))) {
        admin.disableTable(TableName.valueOf(tableName))
        admin.deleteTable(TableName.valueOf(tableName))
      }
      admin.createTable(tableDesc.build())
      admin.close()
      println("Done!")
    } match {
      case Success(_) =>
      case Failure(e) => e.printStackTrace()
    }

  }

删除表

代码语言:txt
复制
/**
    * 删除表
    *
    * @param tableName 表名
    * @param rowKey    行键
    */
  def delete(tableName: String, rowKey: String): Unit = {
    val table = conn.getTable(TableName.valueOf(tableName))
    Try {
      val d = new Delete(rowKey.getBytes)
      table.delete(d)
      table.close()
    } match {
      case Success(_) =>
      case Failure(e) => e.printStackTrace()
    }

  }

往表中存放数据

代码语言:javascript
代码运行次数:0
复制
/**
    *
    * 往表中存放数据
    *
    * @param tableName 表名
    * @param rowKey    行键
    * @param cf        列族
    * @param qualifier 列限定符
    * @param value     具体的值
    */
  def put(tableName: String, rowKey: String, cf: String, qualifier: String, value: String): Unit = {
    println(s"Put row key $rowKey into $tableName. ")
    val table = conn.getTable(TableName.valueOf(tableName))
    Try {
      //准备一个row key
      val p = new Put(rowKey.getBytes)
      //为put操作指定 column qualifier 和 value
      p.addColumn(cf.getBytes, qualifier.getBytes, value.getBytes)
      //放数据到表中
      table.put(p)
      table.close()
    } match {
      case Success(_) => println("Done!")
      case Failure(e) => e.printStackTrace()
    }
  }

获得表中的数据

代码语言:txt
复制
 /**
    * 获得表里面的数据
    *
    * @param tableName 表名
    * @param rowKey    行键
    * @param cf        列族
    * @param qualifier 列限定符
    * @return 获得的数据
    */
  def get(tableName: String, rowKey: String, cf: String, qualifier: String): String = {
    val table = conn.getTable(TableName.valueOf(tableName))
    Try {
      val g = new Get(rowKey.getBytes)
      val result = table.get(g)
      table.close()
      Bytes.toString(result.getValue(cf.getBytes(), qualifier.getBytes()))
    } match {
      case Success(v) => v
      case Failure(e) => e.printStackTrace()
        null
    }

  }

扫描表中的数据

代码语言:txt
复制
 /**
    * 扫描数据
    *
    * @param tableName 表名
    * @param cf        列族
    * @param qualifier 列限定符
    */
  def scan(tableName: String, cf: String, qualifier: String): Unit = {
    val table = conn.getTable(TableName.valueOf(tableName))
    val s = new Scan()
    s.addColumn(cf.getBytes, qualifier.getBytes)
    val scanner = table.getScanner(s)
    Try {
      val iterator = scanner.iterator()
      while (iterator.hasNext) {
        val next = iterator.next()
        println("Found row: " + next)
        println("Found value: " + Bytes.toString(
          next.getValue(cf.getBytes, qualifier.getBytes)))
      }
      scanner.close()
      table.close()
    } match {
      case Success(_) =>
      case Failure(e) => e.printStackTrace()
    }

  }

附录

完整的代码已经上传到gist。

file-hbaseutils-scala

参考文献:

Spark 下操作 HBase(1.0.0 新 API)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 环境配置
  • Scala操作HBase
    • 创建HBase的配置、Connection、Admin
    • 获取表
    • 创建表
    • 删除表
    • 往表中存放数据
    • 获得表中的数据
    • 扫描表中的数据
  • 附录
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档