前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一段有用的代码 | Flink读写parquet文件

一段有用的代码 | Flink读写parquet文件

作者头像
大数据真好玩
发布2021-09-18 15:09:24
2.5K0
发布2021-09-18 15:09:24
举报
文章被收录于专栏:暴走大数据
Flink读parquet
代码语言:javascript
复制
import org.apache.flink.core.fs.Path
import org.apache.flink.formats.parquet.ParquetRowInputFormat
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.types.Row
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.log4j.{Level, Logger}
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.{MessageType, PrimitiveType}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition


object ReadFromParquet {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.flink").setLevel(Level.ERROR)
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val hdfs_parquet_file_path_t1 = "hdfs://ns1/user/hive/warehouse/test.db/t1"
    val hdfs_parquet_file_path = "hdfs://ns1//user/hhy/parquet/2019-11-18--10"

    /**
      * 手动指定 parquet的 schema
      */
    val id = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT64, "id")
    val username = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "username")
    val password = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "password")
    val t1_schema = new MessageType("t1", id, username, password)
    println(s"t1_schema : ${t1_schema}")
    val t1 = env.readFile(new ParquetRowInputFormat(new Path(hdfs_parquet_file_path_t1), t1_schema), hdfs_parquet_file_path_t1)
    //print the second field
    t1.map(_.getField(2)).print().setParallelism(1)


    /**
      * 使用相关接口得到schema
      */
    val configurationconfiguration = new Configuration(true)
    val hdfs: FileSystem = org.apache.hadoop.fs.FileSystem.get(configurationconfiguration)
    val files = hdfs.listFiles(new org.apache.hadoop.fs.Path(hdfs_parquet_file_path), false)
    var flag = true
    var first_file_name = ""
    while (flag){
      if(files.hasNext){
        first_file_name = files.next().getPath.getName
        if(!first_file_name.equalsIgnoreCase(s"_SUCCESS") && !first_file_name.startsWith(".")){
//          println(first_file_name)
          flag = false
        }
//        println(s"flag:$flag")
      }else{
        flag = false
      }
    }

    println(s"first_file_name : ${first_file_name}")
    val parquetFileReader = ParquetFileReader.readFooter(configurationconfiguration, new org.apache.hadoop.fs.Path(hdfs_parquet_file_path + s"/${first_file_name}"))
    val schema: MessageType =parquetFileReader.getFileMetaData().getSchema()
    println(s"readed schema:${schema}")
    /**
      * using by read parquet file's schema
      */
    val t1_one: DataStream[Row] = env.readFile(new ParquetRowInputFormat(new Path(hdfs_parquet_file_path), schema), hdfs_parquet_file_path)

    t1_one.map(_.getField(1)).print().setParallelism(1)

    //执行job
    env.execute("ReadFromParquet")

  }
}
Flink写parquet
代码语言:javascript
复制
import org.apache.flink.core.fs.Path
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.log4j.{Level, Logger}


object SocketSourceAvroparquetSink {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.flink").setLevel(Level.ERROR)
    System.setProperty("HADOOP_USER_NAME", "root")
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val hdfs_parquet_file_save_path = "hdfs://ns1/user/hhy/parquet"
    env.enableCheckpointing(1000)
    val port = 9999
    val source = env.socketTextStream("localhost", port)

    val wc: DataStream[WORD] = source
      .flatMap(_.split("\\s"))
      .filter(_ != null)
      .filter(!"".equalsIgnoreCase(_))
      .map(WORD(_, 1))
      .keyBy("word")
      .timeWindow(Time.seconds(3))
      .sum("count")

    /**
      * ParquetAvroWriters 这种方式保存的文件,spark.read.parquet 可以直接读取
      * 也可以 完整的写入到 hdfs文件中去
      */
    val sink_parquet: StreamingFileSink[WORD] = StreamingFileSink
      .forBulkFormat(new Path(hdfs_parquet_file_save_path), ParquetAvroWriters.forReflectRecord(classOf[WORD]))
      .withBucketAssigner(new DateTimeBucketAssigner())
      .build()
    wc.addSink(sink_parquet).setParallelism(1)

    env.execute("SocketSourceAvroparquetSink")
  }
  case class WORD(word:String, count:Int)
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-08-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flink读parquet
  • Flink写parquet
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档