Flink 版本:1.13.5
本文主要展示了 Table API 和 SQL 程序的常见结构,如何创建注册 Table,查询 Table,以及如何输出 Table。
在 Flink 中,Table API 和 SQL 可以看作联结在一起的一套 API,这套 API 的核心概念是一个可以用作 Query 输入和输出的表 Table。在我们程序中,输入数据可以定义成一张表,然后对这张表进行查询得到一张新的表,最后还可以定义一张用于输出的表,负责将处理结果写入到外部系统。
我们可以看到,程序的整体处理流程与 DataStream API 非常相似,也可以分为读取数据源(Source)、转换(Transform)、输出数据(Sink)三部分。只不过这里的输入输出操作不需要额外定义,只需要将用于输入和输出的表 Table 定义出来,然后进行转换查询就可以了。
SQL 程序基本架构如下:
// 创建执行环境 TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 创建输入表
String sourceSql = "CREATE TABLE datagen_table (\n" +
" word STRING,\n" +
" frequency int\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second' = '1',\n" +
" 'fields.word.kind' = 'random',\n" +
" 'fields.word.length' = '1',\n" +
" 'fields.frequency.min' = '1',\n" +
" 'fields.frequency.max' = '9'\n" +
")";
tableEnv.executeSql(sourceSql);
// 创建输出表
String sinkSql = "CREATE TABLE print_table (\n" +
" word STRING,\n" +
" frequency INT\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")";
tableEnv.executeSql(sinkSql);
// 执行计算并输出
String sql = "INSERT INTO print_table\n" +
"SELECT word, SUM(frequency) AS frequency\n" +
"FROM datagen_table\n" +
"GROUP BY word";
tableEnv.executeSql(sql);
Table API 程序基本架构如下:
// 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 读取数据创建 DataStream
DataStream<WordCount> inputStream = env.fromElements(
new WordCount("Hello", 1L),
new WordCount("Ciao", 1L),
new WordCount("Hello", 1L));
// DataStream 转 Table
Table table = tEnv.fromDataStream(inputStream);
// 执行查询
Table resultTable = table
.groupBy($("word"))
.select($("word"), $("frequency").sum().as("frequency"))
.as("word, frequency");
// Table 转 DataStream
DataStream<Tuple2<Boolean, WordCount>> result = tEnv.toRetractStream(resultTable, WordCount.class);
result.print();
// 执行
env.execute();
TableEnvironment 是用来创建 Table & SQL 程序的上下文执行环境,也是 Table & SQL 程序的入口,Table & SQL 程序的所有功能都是围绕 TableEnvironment 这个核心类展开的。TableEnvironment 的主要职能包括:
每个 Table 和 SQL 的执行,都必须绑定在一个表环境 TableEnvironment 中:
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
或者,可以基于现有的 StreamExecutionEnvironment 创建 StreamTableEnvironment 来与 DataStream API 进行相互转换:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
表 Table 是在关系型数据库中非常熟悉的一个概念,是数据存储的基本形式,也是 SQL 执行的基本对象。Flink 中的表 Table 概念也并不特殊,是由多个行 Row 数据构成的,每行又可以定义好多的列 Column 字段。
为了方便查询表 Table,TableEnvironment 会维护一个目录 Catalog 和表 Table 的映射关系。所以表 Table 都是通过 Catalog 来进行注册创建的。每个表 Table 都有一个唯一的 ID,由三部分组成:目录(Catalog)名称,数据库(Database)名称 以及表(Table)名。在默认情况下,目录 Catalog 名称为 default_catalog,数据库 Database 名称为 default_database。如果我们有一个表 Table 叫作 MyTable,那么完整的 ID 就是 default_catalog.default_database.MyTable。
表 Table 有两种类型的表,一种是连接器表(Connector Tables) Table,一种是虚拟表(Virtual Tables) VIEW。连接器表一般用来描述外部数据,例如文件、数据库表或者消息队列。虚拟表通常是 Table API 或 SQL 查询的结果,可以基于现有的连接器表 Table 对象来创建。
创建 Table 最直观的方式,就是通过连接器(Connector)连接到一个外部系统,然后定义出对应的表结构。例如我们可以连接到 Kafka 或者文件系统,将存储在这些外部系统的数据以表 Table 的形式定义出来,这样对表 Table 的读写就可以通过连接器转换成对外部系统的读写。连接器表可以直接通过 SQL DDL 方式创建:
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// Using SQL DDL
String sql = "CREATE TABLE kafka_source_table (\n" +
" word STRING COMMENT '单词',\n" +
" frequency BIGINT COMMENT '次数'\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'word',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.group.id' = 'kafka-connector-word-sql',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json',\n" +
" 'json.ignore-parse-errors' = 'true',\n" +
" 'json.fail-on-missing-field' = 'false'\n" +
")";
tableEnv.executeSql(sql);
在 Flink 1.14 版本中除了可以通过 SQL DDL 方式创建,还可以通过 Table API 方式创建:
// Schema
Schema schema = Schema.newBuilder()
.column("word", DataTypes.STRING())
.column("frequency", DataTypes.BIGINT())
.build();
// Kafka Source TableDescriptor
TableDescriptor kafkaDescriptor = TableDescriptor.forConnector("kafka")
.comment("kafka source table")
.schema(schema)
.option(KafkaConnectorOptions.TOPIC, Lists.newArrayList("word"))
.option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, "localhost:9092")
.option(KafkaConnectorOptions.PROPS_GROUP_ID, "kafka-table-descriptor")
.option("scan.startup.mode", "earliest-offset")
.format("json")
.build();
// 注册 Kafka Source 表
tEnv.createTemporaryTable("kafka_source_table", kafkaDescriptor);
假设我们现在有一个 Table 对象 inputTable,如果我们希望直接在 SQL 中查询这个 Table,我们该怎么做呢?由于 inputTable 是一个 Table 对象,并没有在 TableEnvironment 中注册,所以不能直接使用。如果要在 SQL 中直接使用,需要在 TableEnvironment 中注册,如下所示:
// 创建流和表执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建 DataStream
DataStream<String> dataStream = env.fromElements("Alice", "Bob", "John");
// DataStream 转换为 Table
Table inputTable = tableEnv.fromDataStream(dataStream, $("f0"));
// 将 Table 注册为虚拟表
tableEnv.createTemporaryView("input_table_view", inputTable);
// 执行查询并输出
Table upperTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM input_table_view");
DataStream<Row> upperStream = tableEnv.toDataStream(upperTable);
upperStream.print("U");
这里的注册其实是创建了一个虚拟表(Virtual Table),这与 SQL 语 法中的视图(View)非常类似,所以调用的方法也叫作创建虚拟视图(createTemporaryView)。视图之所以是虚拟的,是因为我们并不会直接保存这个表的内容。注册为虚拟表之后,我们就可以在 SQL 中直接使用 input_table_view 进行查询了。
除了可以将 Table 对象注册为虚拟表之外,我们也可以将 DataStream 直接注册为一个虚拟表
// 创建流和表执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建 DataStream
DataStream<String> dataStream = env.fromElements("Alice", "Bob", "John");
// 2. 将 DataStream 注册为虚拟表
// 2.1 自动派生所有列
tableEnv.createTemporaryView("input_stream_view", dataStream);
// 2.2 自动派生所有列 但使用表达式方式指定提取的字段以及位置
//tableEnv.createTemporaryView("input_stream_view2", dataStream, $("f0"));
// 2.3 手动定义列
/*Schema schema = Schema.newBuilder()
.column("f0", DataTypes.STRING())
.build();
tableEnv.createTemporaryView("input_stream_view3", dataStream, schema);*/
Table lowerTable = tableEnv.sqlQuery("SELECT LOWER(f0) FROM input_stream_view");
DataStream<Row> lowerStream = tableEnv.toDataStream(lowerTable);
lowerStream.print("L");
创建好了表,接下来自然就是对表进行查询转换了。
查询 Table 最简单的方式就是通过 SQL 语句来查询了。Flink 基于 Apache Calcite 来提供对 SQL 的支持。在代码中,我们只需要调用 TableEnvironment 的 sqlQuery() 方法,并传入一个字符串的 SQL 查询语句就可以了,返回值是一个 Table 对象:
// 创建流和表执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建 DataStream
DataStream<Row> dataStream = env.fromElements(
Row.of("Alice", 12),
Row.of("Bob", 10),
Row.of("Alice", 100),
Row.of("Lucy", 50)
);
// 将 DataStream 转换为 Table
Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "score");
// 注册虚拟表 通过SQL查询
tableEnv.createTemporaryView("input_table", inputTable);
Table resultTable1 = tableEnv.sqlQuery("SELECT name, SUM(score) AS score_sum\n" +
"FROM input_table\n" +
"WHERE name <> 'Lucy'\n" +
"GROUP BY name");
// Table 转 Changelog DataStream
DataStream<Row> resultStream1 = tableEnv.toChangelogStream(resultTable1);
// 输出
resultStream1.print("R1");
Table 对象不注册为虚拟表,也可以直接在 SQL 中使用,即使用字符串拼接的方式:
Table resultTable2 = tableEnv.sqlQuery("SELECT name, SUM(score) AS score_sum\n" +
"FROM " + inputTable +
" WHERE name <> 'Lucy'\n" +
"GROUP BY name");
这其实是一种简略的写法,我们将 Table 对象名 inputTable 直接以字符串拼接的形式添加到 SQL 语句中,在解析时会自动注册一个同名的虚拟表到环境中,这样就省略了创建虚拟表的过程。
目前 Flink 支持标准 SQL 中的绝大部分用法,并提供了丰富的计算函数。这样我们可以像在 MySQL、Hive 中那样直接通过编写 SQL 实现自己的需求,从而大大降低了 Flink 上手的难度。
另外一种查询方式是通过调用 Table API 实现。Table API 是嵌入在 Java 和 Scala 语言内的查询 API。相比 SQL,查询不需要指定查询 SQL 字符串,而是使用宿主语言一步步链式调用。可以通过 fromDataStream 得到表的 Table 对象。得到 Table 对象之后,就可以调用 API 进行各种转换操作了。每个方法的返回都是一个新的 Table 对象,表示对输入 Table 应用关系操作的结果。一些关系操作是由多个方法调用组成的,例如 table.groupBy(…).select(…),其中 groupBy(…) 指定了 table 的分组,select(…) 指定了在分组表上的投影。如下示例展示了一个简单的 Table API 聚合查询:
// 创建流和表执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建 DataStream
DataStream<Row> dataStream = env.fromElements(
Row.of("Alice", 12),
Row.of("Bob", 10),
Row.of("Alice", 100),
Row.of("Lucy", 50)
);
// 将 DataStream 转换为 Table
Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "score");
// 执行聚合计算
Table resultTable3 = inputTable
.filter($("name").isNotEqual("Lucy"))
.groupBy($("name"))
.select($("name"), $("score").sum().as("score_sum"));
// Table 转 Changelog DataStream
DataStream<Row> resultStream3 = tableEnv.toChangelogStream(resultTable3);
// 输出
resultStream3.print("R3");
可以发现,无论是调用 Table API 还是执行 SQL,得到的结果都是一个 Table 对象,所以这两种 API 的查询可以很方便地结合在一起:
两种 API 殊途同归,实际应用中可以按照自己的习惯任意选择。不过由于结合使用容易引起混淆,而 Table API 功能相对较少、通用性较差,所以企业项目中往往会直接选择 SQL 的方式来实现需求。
表的创建和查询分别对应流处理中的读取数据源(Source)和转换(Transform),而表的输出则写入数据源(Sink),也就是将结果数据输出到外部系统。
输出一张表 Table 最直接的方法,就是通过 SQL 的 INSERT INTO SELECT 语句来完成:
// 创建流和表执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建 DataStream
DataStream<Row> dataStream = env.fromElements(
Row.of("Alice", 12),
Row.of("Bob", 10),
Row.of("Alice", 100),
Row.of("Lucy", 50)
);
// 将 DataStream 转换为 Table
Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "score");
// 输入表: 注册虚拟表
tableEnv.createTemporaryView("input_table", inputTable);
// 1. 通过 SQL 方式实现
// 输出表: 创建 Print Connector 表
tableEnv.executeSql("CREATE TEMPORARY TABLE print_sql_sink (\n" +
" name STRING,\n" +
" score BIGINT\n" +
") WITH (\n" +
" 'connector' = 'print',\n" +
" 'print-identifier' = 'SQL'\n" +
")");
// 聚合计算并输出
tableEnv.executeSql("INSERT INTO print_sql_sink\n" +
"SELECT name, SUM(score) AS score_sum\n" +
"FROM input_table\n" +
"GROUP BY name");
此外,还可以调用 Table 的方法 executeInsert() 方法将一个 Table 写入到注册过的表中(print_sink_table),方法传入的参数就是注册的表名
Table outputTable = inputTable
.filter($("name").isNotEqual("Lucy"))
.groupBy($("name"))
.select($("name"), $("score").sum().as("score_sum"));
// Flink 1.13 版本推荐使用 DDL 方式创建输出表 1.14 版本可以用 Table API 创建
tableEnv.executeSql("CREATE TEMPORARY TABLE print_table_sink (\n" +
" name STRING,\n" +
" score BIGINT\n" +
") WITH (\n" +
" 'connector' = 'print',\n" +
" 'print-identifier' = 'Table'\n" +
")");
outputTable.executeInsert("print_table_sink");
Flink 1.14 版本开始,可以通过 Table API 来创建 Connector 表
// 创建流和表执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建 DataStream
DataStream<Row> dataStream = env.fromElements(
Row.of("Alice", 12),
Row.of("Bob", 10),
Row.of("Alice", 100),
Row.of("Lucy", 50)
);
// 将 DataStream 转换为 Table
Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "score");
// 聚合计算
Table outputTable = inputTable
.filter($("name").isNotEqual("Lucy"))
.groupBy($("name"))
.select($("name"), $("score").sum().as("score_sum"));
// 创建 Print Connector 表
tableEnv.createTemporaryTable(
"print_sink_table",
TableDescriptor.forConnector("print")
.schema(Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("score_sum", DataTypes.BIGINT())
.build()
)
.build()
);
// 输出
outputTable.executeInsert("print_sink_table");