前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >Flink kafka sink to RDBS 测试Demo

Flink kafka sink to RDBS 测试Demo

作者头像
小石头
发布于 2022-11-10 13:41:14
发布于 2022-11-10 13:41:14
1.2K00
代码可运行
举报
文章被收录于专栏:小石头小石头
运行总次数:0
代码可运行
  • flink sql 模式代码demo (Java

(使用flink sql 进行流式处理注意字段的映射)

官方文档类型映射

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.util.Properties;
/**
 * @author frost
 */
public class FlinkStreamJob {
    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                .useOldPlanner()
                .inStreamingMode()
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
        String topic = "topic_invoke_statistics";
        //KAFKA properties
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.128.212:9092");
        properties.setProperty("group.id", "frost-consumer");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");
        //source 源添加
        DataStreamSource<String> data = env.addSource(new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties));
        data.print();
        //dataStream 数据操作,目前将json String 转换为 pojo 对象,后续可使用Flink算子进行数据集处理
        DataStream<Test> mapDs = data.map(line ->
                JSON.parseObject(line, Test.class));
        mapDs.print();
        Table kafkaInputTable = tableEnv.fromDataStream(mapDs);
        // kafka 数据源注册为source 临时表
        tableEnv.createTemporaryView("kafkaInputTable", kafkaInputTable);
        // Mysql sink源表创建
        // 本地调试 宿主机内网地址由于连接的wifi为动态分配,需要确定IP地址,否则会连接不上
        tableEnv.executeSql("CREATE TABLE flink_test_table (\n" +
                "    host STRING,\n" +
                "    productId INT,\n" +
                "    referrer STRING,\n" +
                "    remoteAddr STRING,\n" +
                "    remotePort INT,\n" +
                "    request STRING,\n" +
                "    requestTime TIMESTAMP,\n" +
                "    requestUri STRING,\n" +
                "    scheme STRING,\n" +
                "    tenantId INT,\n" +
                "    userAgent STRING\n" +
                ") WITH (\n" +
                "    'connector.type' = 'jdbc', \n" +
//                "    'connector.url' = 'jdbc:mysql://192.168.20.109:3306/flink-test', \n" +
                "    'connector.url' = 'jdbc:postgresql://192.168.128.214:5432/flink_test', \n" +
                "    'connector.table' = 'flink_test_table',\n" +
                "    'connector.username' = 'postgres',\n" +
                "    'connector.password' = 'test', \n" +
                "    'connector.write.flush.max-rows' = '1' \n" +
                ")");
//        Table query = tableEnv.sqlQuery("select productId from kafkaInputTable");
        Table query1 = tableEnv.sqlQuery("select * from kafkaInputTable");
//        tableEnv.toRetractStream(query1, Row.class).print();
//        tableEnv.sqlQuery("select * from kafkaInputTable").execute().print();
        tableEnv.executeSql("insert into flink_test_table select * from kafkaInputTable").print();
        env.execute("StreamingJob");
    }
}

Flink Table Sink到File、Kafka、Es、Mysql

  • 知识点

表的输出,是通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口,可以 支持不同的文件格式、存储数据库消息队列

具体实现,输出表最直接的方法,就是通过 Table.insertInto() 方法将一个 Table 写入 注册过的 TableSink 中。同时表的输出跟更新模式有关

更新模式(Update Mode)

​ 对于流式查询(Streaming Queries),需要声明如何在(动态)表和外部连接器之间执行 转换。与外部系统交换的消息类型,由更新模式(update mode)指定。

​ Flink Table API 中的更新模式有以下三种:

追加模式(Append Mode) ​ 在追加模式下,表(动态表)和外部连接器只交换插入(Insert)消息。

撤回模式(Retract Mode) ​ 在撤回模式下,表和外部连接器交换的是:添加(Add)和撤回(Retract)消息。 ​ 插入(Insert)会被编码为添加消息; ​ 删除(Delete)则编码为撤回消息; ​ 更新(Update)则会编码为,已更新行(上一行)的撤回消息,和更新行(新行) 的添加消息。 ​ 在此模式下,不能定义 key,这一点跟 upsert 模式完全不同。


更新模式 (Upsert Mode) ​ 在 Upsert 模式下,动态表和外部连接器交换 Upsert 和 Delete 消息。 ​ 这个模式需要一个唯一的 key,通过这个 key 可以传递更新消息。为了正确应用消息外部连接器需要知道这个唯一 key 的属性。 ​ 插入(Insert)和更新(Update)都被编码为 Upsert 消息; ​ 删除(Delete)编码为 Delete 信息。 ​ 这种模式和 Retract 模式的主要区别在于,Update 操作是用单个消息编码的,所以效率 会更高。

  • 文件代码案例
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package guigu.table.sink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
object FileSink {
  def main(args: Array[String]): Unit = {
    //1、环境准备
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    //2、读取数据,创建表视图
    val inputFile = "E:\\java\\demo\\src\\main\\resources\\file\\data5.csv"
    tableEnv.connect(new FileSystem().path(inputFile))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id",DataTypes.STRING())
        .field("temperature",DataTypes.DOUBLE())
        .field("timestamp",DataTypes.BIGINT())
      )
      .createTemporaryTable("inputTable")
    //3、table api转换
    val tableApi: Table = tableEnv.from("inputTable")
    val apiResult: Table = tableApi.select("id,temperature").where("id = 'sensor_1'")
    val sqlResult: Table = tableEnv.sqlQuery("select id,temperature from inputTable where id = 'sensor_1'")
    //字符串模板
    val sqlModelResult: Table = tableEnv.sqlQuery(
      """
        |select id,temperature
        |from inputTable
        |where id = 'sensor_1'
      """.stripMargin)
    //4、创建输出表视图
    val outputFile = "E:\\java\\demo\\src\\main\\resources\\file\\outputFile.csv"
    tableEnv.connect(new FileSystem().path(outputFile))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id",DataTypes.STRING())
        .field("temperature",DataTypes.DOUBLE())
        )
      .createTemporaryTable("outputTable")
    //5、执行
    sqlModelResult.insertInto("outputTable")
    tableEnv.execute("Flink Sink Flie Test")
  }
}
  • Es代码案例
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>1.10.1</version>
    </dependency>
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package table.tableSink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.descriptors.{Csv, Elasticsearch, FileSystem, Json, Kafka, Schema}
object EsSink {
  def main(args: Array[String]): Unit = {
    //1、环境准备
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val streamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    //2、读取数据并转为表视图
    val filePath = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt"
    val schema: Schema = new Schema().field("id", DataTypes.STRING())
      .field("timestamp", DataTypes.BIGINT())
      .field("temperature", DataTypes.DOUBLE())
    streamTableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv())
      .withSchema(schema)
      .createTemporaryTable("inputTable")
    //3、表的转换
    val inputTable: Table = streamTableEnv.from("inputTable")
    val resultTable: Table = inputTable.select("id,temperature").where("id = 'sensor_1'")
    val aggTable: Table = inputTable.groupBy('id).select('id, 'id.count as 'count)
    //4、注册表输出视图,输出到es
    streamTableEnv.connect(
      new Elasticsearch()
        .version("6")
        .host("localhost", 9200, "http")
        .index("sensor")
        .documentType("_doc")
     .bulkFlushMaxActions(1) //一定要加呀,否则数据都在内存中,没有输出到es
    )
      .inUpsertMode()
      .withFormat(new Json())
      .withSchema(new Schema().field("id",DataTypes.STRING())
      .field("temperature",DataTypes.DOUBLE()))
      .createTemporaryTable("outputEsTable")
    //5、执行
    aggTable.insertInto("outputEsTable")
    env.execute()
  }
}
  • Kafka代码案例
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package table.tableSink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.descriptors.{Csv, FileSystem, Kafka, Schema}
object KafkaSink {
  def main(args: Array[String]): Unit = {
    //1、表的环境准备
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val streamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    //2、读取数据并转为表视图
    val filePath = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt"
    val outputPath = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\output.txt"
    val schema: Schema = new Schema().field("id", DataTypes.STRING())
      .field("timestamp", DataTypes.BIGINT())
      .field("temperature", DataTypes.DOUBLE())
    streamTableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv())
      .withSchema(schema)
      .createTemporaryTable("inputTable")
    //3、表的基本转换
    val inputTable: Table = streamTableEnv.from("inputTable")
    val resultTable: Table = inputTable.select("id,temperature").where("id = 'sensor_1'")
    //4、注册输出表视图,输出至kafka
    streamTableEnv.connect(
      new Kafka()
        .version("0.11")
        .topic("sinkTest")
        .property("zookeeper.connect", "localhost:2181")
        .property("bootstrap.servers", "localhost:9092")
    )
      .withFormat(new Csv())
      .withSchema(new Schema().field("id",DataTypes.STRING())
      .field("temperature",DataTypes.DOUBLE()))
      .createTemporaryTable("outputKafkaTable")
    //5、执行
    resultTable.insertInto("outputKafkaTable")
    env.execute()
  }
}
  • mysql代码案例
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-jdbc_2.12</artifactId>
      <version>1.10.1</version> 
    </dependency>
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package table.tableSink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.descriptors._
object MysqlSink {
  def main(args: Array[String]): Unit = {
    //1、环境准备
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val streamTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    //2、读取数据并转为表视图
    val filePath = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt"
    val schema: Schema = new Schema().field("id", DataTypes.STRING())
      .field("timestamp", DataTypes.BIGINT())
      .field("temperature", DataTypes.DOUBLE())
    streamTableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv())
      .withSchema(schema)
      .createTemporaryTable("inputTable")
    //3、表的转换
    val inputTable: Table = streamTableEnv.from("inputTable")
    val resultTable: Table = inputTable.select("id,temperature").where("id = 'sensor_1'")
    val aggTable: Table = inputTable.groupBy('id).select('id, 'id.count as 'cnt)
    //4、创建mysql DDL,并在环境中执行  with表示连接器
    val sinkDDL: String =
      """
        |create table jdbcOutputTable (
        | id varchar(20) not null,
        | cnt bigint not null
        |) with (
        | 'connector.type' = 'jdbc',
        | 'connector.url' = 'jdbc:mysql://localhost:3306/test',
        | 'connector.table' = 'sensor_count',
        | 'connector.driver' = 'com.mysql.jdbc.Driver',
        | 'connector.username' = 'root',
        | 'connector.password' = '123456'
        |) """.stripMargin
    streamTableEnv.sqlUpdate(sinkDDL)
    //5、执行
    aggTable.insertInto("jdbcOutputTable")
    env.execute()
  }
}

flink 1.12 注意事项

flink 依赖加载问题

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# Child first classloading allows users to use different dependency/library
# versions in their application than those in the classpath. Switching back
# to 'parent-first' may help with debugging dependency issues.
#
 classloader.resolve-order: parent-first

需要将类加载修改,由于flink 1.12 更改为了无顺序加载依赖

添加 mysql 驱动依赖

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 <!-- mysql 驱动 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.26</version>
        </dependency>

添加 postgreSql 驱动

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.19</version>
        </dependency>
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-12-10 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
十分钟入门Fink SQL
Flink 本身是批流统一的处理框架,所以 Table API 和 SQL,就是批流统一的上层处理 API。目前功能尚未完善,处于活跃的开发阶段。 Table API 是一套内嵌在 Java 和 Scala 语言中的查询 API,它允许我们以非常直观的方式,组合来自一些关系运算符的查询(比如 select、filter 和 join)。而对于 Flink SQL,就是直接可以在代码中写 SQL,来实现一些查询(Query)操作。Flink 的 SQL 支持,基于实现了 SQL 标准的 Apache Calcite(Apache 开源 SQL 解析工具)。
大数据老哥
2021/02/04
1.2K0
十分钟入门Fink SQL
干货 | 五千字长文带你快速入门FlinkSQL
最近几天因为工作比较忙,已经几天没有及时更新文章了,在这里先给小伙伴们说声抱歉…临近周末,再忙再累,我也要开始发力了。接下来的几天,菌哥将为大家带来关于FlinkSQL的教程,之后还会更新一些大数据实时数仓的内容,和一些热门的组件使用!希望小伙伴们能点个关注,第一时间关注技术干货!
大数据梦想家
2021/01/27
2K0
干货 | 五千字长文带你快速入门FlinkSQL
一篇文章让深入理解Flink SQL 时间特性
基于时间的操作(比如 Table API 和 SQL 中窗口操作),需要定义相关的时间语义和时间数据来源的信息。所以,Table 可以提供一个逻辑上的时间字段,用于在表处理程序中,指示时间和访问相应的时间戳。
大数据老哥
2021/02/04
1.8K0
一篇文章让深入理解Flink SQL 时间特性
2021年大数据Flink(三十二):​​​​​​​Table与SQL案例准备 API
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#create-a-tableenvironment
Lansonli
2021/10/11
8360
​flink实战-flink streaming sql 初体验
SQL,Structured Query Language:结构化查询语言,作为一个通用、流行的查询语言,不仅仅是在传统的数据库,在大数据领域也变得越来越流行,hive、spark、kafka、flink等大数据组件都支持sql的查询,使用sql可以让一些不懂这些组件原理的人,轻松的来操作,大大的降低了使用的门槛,今天我们先来简单的讲讲在flink的流处理中如何使用sql.
大数据技术与应用实战
2020/09/15
1.9K0
Flink SQL 自定义 Sink
内部要做 Flink SQL 平台,本文以自定义 Redis Sink 为例来说明 Flink SQL 如何自定义 Sink 以及自定义完了之后如何使用 基于 Flink 1.11
shengjk1
2020/10/26
3.2K1
Flink SQL 自定义 Sink
Flink学习笔记(9)-Table API 和 Flink SQL
• Table API 是一套内嵌在 Java 和 Scala 语言中的查询API,它允许以非常直观的方式组合来自一些关系运算符的查询
挽风
2022/05/11
2.3K0
Flink学习笔记(9)-Table API 和 Flink SQL
Flink DataStream API与Data Table API/SQL集成
在定义数据处理管道时,Table API 和 DataStream API 同样重要。
从大数据到人工智能
2022/02/24
4.4K0
FlinkSQL | 流处理中的特殊概念
上一篇文章,为大家介绍了关于 FlinkSQL 的背景,常见使用以及一些小技巧。学完之后,对于FlinkSQL只能算是简单入了个门。不过不用担心,本篇文章,博主将为大家带来关于 FlinkSQL中流处理的特殊概念,喜欢的话,记得看完点个赞|ू・ω・` )
大数据梦想家
2021/10/22
2K0
快速了解Flink SQL Sink
表的输出,是通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口,可以支持不同的文件格式、存储数据库和消息队列。
大数据老哥
2021/02/04
3.2K0
快速了解Flink SQL Sink
Flink重点难点:Flink Table&SQL必知必会(一)
Flink本身是批流统一的处理框架,所以Table API和SQL,就是批流统一的上层处理API。目前功能尚未完善,处于活跃的开发阶段。
王知无-import_bigdata
2021/09/22
2.1K0
Flink重点难点:Flink Table&SQL必知必会(二)
介绍了 Flink Table & SQL的一些核心概念,本部分将介绍 Flink 中窗口和函数。
王知无-import_bigdata
2021/09/22
2.1K0
快速手上Flink SQL——Table与DataStream之间的互转
上述讲到,成功将一个文件里的内容使用SQL进行了一解析(快速入门Flink SQL —— 介绍及入门)本篇文章主要会跟大家分享如何连接kafka,MySQL,作为输入流和数出的操作,以及Table与DataStream进行互转。
大数据老哥
2021/02/04
2.3K0
快速手上Flink SQL——Table与DataStream之间的互转
FlinkSQL内置了这么多函数你都使用过吗?
Flink Table 和 SQL 内置了很多 SQL 中支持的函数;如果有无法满足的需要,则可以实现用户自定义的函数(UDF)来解决。
大数据老哥
2021/02/04
2.8K0
FlinkSQL内置了这么多函数你都使用过吗?
聊聊flink的Table API及SQL Programs
序 本文主要研究一下flink的Table API及SQL Programs flink-forward-sf-2017-timo-walther-table-sql-api-unified-apis-for-batch-and-stream-processing-8-638.jpg 实例 // for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment StreamExecutionEnvironm
code4it
2019/01/21
2.1K0
聊聊flink的Table API及SQL Programs
(5)Flink CEP SQL四种匹配模式效果演示
从匹配成功的事件序列中最后一个对应于patternItem的事件开始进行下一次匹配
NBI大数据
2022/08/24
4940
(5)Flink CEP SQL四种匹配模式效果演示
聊聊flink Table Schema的定义
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/StreamTableEnvironment.scala
code4it
2019/02/03
3.4K0
聊聊flink Table Schema的定义
数据湖(六):Hudi与Flink整合
Hudi0.8.0版本与Flink1.12.x之上版本兼容,目前经过测试,Hudi0.8.0版本开始支持Flink,通过Flink写数据到Hudi时,必须开启checkpoint,至少有5次checkpoint后才能看到对应hudi中的数据。
Lansonli
2022/06/03
1.1K0
数据湖(六):Hudi与Flink整合
2021年大数据Flink(三十七):​​​​​​​Table与SQL ​​​​​​案例四
Apache Flink 1.12 Documentation: Table API & SQL
Lansonli
2021/10/11
3030
一篇文章带你深入理解FlinkSQL中的窗口
时间语义,要配合窗口操作才能发挥作用。最主要的用途,当然就是开窗口、根据时间段做计算了。下面我们就来看看 Table API 和 SQL 中,怎么利用时间字段做窗口操作。在 Table API 和 SQL 中,主要有两种窗口:Group Windows 和 Over Windows(时间语义的文章推荐)
大数据老哥
2021/02/04
2K0
一篇文章带你深入理解FlinkSQL中的窗口
相关推荐
十分钟入门Fink SQL
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档