Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊flink Table的Time Attributes

聊聊flink Table的Time Attributes

原创
作者头像
code4it
发布于 2019-02-01 06:31:31
发布于 2019-02-01 06:31:31
1.8K00
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下flink Table的Time Attributes

Processing time

通过fromDataStream定义

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
DataStream<Tuple2<String, String>> stream = ...;// declare an additional logical field as a processing time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");
​
WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  • 从DataStream创建Table的话,可以在fromDataStream里头进行定义Processing time

通过TableSource定义

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// define a table source with a processing attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
​
    @Override
    public TypeInformation<Row> getReturnType() {
        String[] names = new String[] {"Username" , "Data"};
        TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
        return Types.ROW(names, types);
    }
​
    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
        // create stream
        DataStream<Row> stream = ...;
        return stream;
    }
​
    @Override
    public String getProctimeAttribute() {
        // field with this name will be appended as a third field
        return "UserActionTime";
    }
}// register table source
tEnv.registerTableSource("UserActions", new UserActionSource());
​
WindowedTable windowedTable = tEnv
    .scan("UserActions")
    .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  • 通过TableSource创建Table的话,可以通过实现DefinedProctimeAttribute接口来定义Processing time

Event time

通过fromDataStream定义

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Option 1:// extract timestamp and assign watermarks based on knowledge of the stream
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// declare an additional logical field as an event time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime");
​
​
// Option 2:// extract timestamp from first field, and assign watermarks based on knowledge of the stream
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");// Usage:
​
WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  • 从DataStream创建Table的话,可以在fromDataStream里头进行定义Event time;具体有两种方式,一种是额外定义一个字段,一种是覆盖原有的字段

通过TableSource定义

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// define a table source with a rowtime attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {
​
    @Override
    public TypeInformation<Row> getReturnType() {
        String[] names = new String[] {"Username", "Data", "UserActionTime"};
        TypeInformation[] types =
            new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
        return Types.ROW(names, types);
    }
​
    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
        // create stream
        // ...
        // assign watermarks based on the "UserActionTime" attribute
        DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
        return stream;
    }
​
    @Override
    public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
        // Mark the "UserActionTime" attribute as event-time attribute.
        // We create one attribute descriptor of "UserActionTime".
        RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
            "UserActionTime",
            new ExistingField("UserActionTime"),
            new AscendingTimestamps());
        List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
        return listRowtimeAttrDescr;
    }
}// register the table source
tEnv.registerTableSource("UserActions", new UserActionSource());
​
WindowedTable windowedTable = tEnv
    .scan("UserActions")
    .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  • 通过TableSource创建Table的话,可以通过实现DefinedRowtimeAttributes接口来定义Event time

definedTimeAttributes

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/sources/definedTimeAttributes.scala

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
  * Extends a [[TableSource]] to specify a processing time attribute.
  */
trait DefinedProctimeAttribute {/**
    * Returns the name of a processing time attribute or null if no processing time attribute is
    * present.
    *
    * The referenced attribute must be present in the [[TableSchema]] of the [[TableSource]] and of
    * type [[Types.SQL_TIMESTAMP]].
    */
  @Nullable
  def getProctimeAttribute: String
}/**
  * Extends a [[TableSource]] to specify rowtime attributes via a
  * [[RowtimeAttributeDescriptor]].
  */
trait DefinedRowtimeAttributes {/**
    * Returns a list of [[RowtimeAttributeDescriptor]] for all rowtime attributes of the table.
    *
    * All referenced attributes must be present in the [[TableSchema]] of the [[TableSource]] and of
    * type [[Types.SQL_TIMESTAMP]].
    *
    * @return A list of [[RowtimeAttributeDescriptor]].
    */
  def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]
}/**
  * Describes a rowtime attribute of a [[TableSource]].
  *
  * @param attributeName The name of the rowtime attribute.
  * @param timestampExtractor The timestamp extractor to derive the values of the attribute.
  * @param watermarkStrategy The watermark strategy associated with the attribute.
  */
class RowtimeAttributeDescriptor(
  val attributeName: String,
  val timestampExtractor: TimestampExtractor,
  val watermarkStrategy: WatermarkStrategy) {/** Returns the name of the rowtime attribute. */
  def getAttributeName: String = attributeName
​
  /** Returns the [[TimestampExtractor]] for the attribute. */
  def getTimestampExtractor: TimestampExtractor = timestampExtractor
​
  /** Returns the [[WatermarkStrategy]] for the attribute. */
  def getWatermarkStrategy: WatermarkStrategy = watermarkStrategy
​
  override def equals(other: Any): Boolean = other match {
    case that: RowtimeAttributeDescriptor =>
        Objects.equals(attributeName, that.attributeName) &&
        Objects.equals(timestampExtractor, that.timestampExtractor) &&
        Objects.equals(watermarkStrategy, that.watermarkStrategy)
    case _ => false
  }
​
  override def hashCode(): Int = {
    Objects.hash(attributeName, timestampExtractor, watermarkStrategy)
  }
}
  • DefinedProctimeAttribute定义了getProctimeAttribute方法,返回String,用于定义Process time的字段名;DefinedRowtimeAttributes定义了getRowtimeAttributeDescriptors方法,返回的是RowtimeAttributeDescriptor的List,RowtimeAttributeDescriptor有3个属性,分别是attributeName、timestampExtractor及watermarkStrategy

小结

  • 在从DataStream或者TableSource创建Table时可以指定Time Attributes,指定了之后就可以作为field来使用或者参与time-based的操作
  • 针对Processing time,如果从DataStream创建Table的话,可以在fromDataStream里头进行定义;通过TableSource创建Table的话,可以通过实现DefinedProctimeAttribute接口来定义Processing time;DefinedProctimeAttribute定义了getProctimeAttribute方法,返回String,用于定义Process time的字段名
  • 针对Event time,如果从DataStream创建Table的话,可以在fromDataStream里头进行定义;具体有两种方式,一种是额外定义一个字段,一种是覆盖原有的字段;通过TableSource创建Table的话,可以通过实现DefinedRowtimeAttributes接口来定义Event time;DefinedRowtimeAttributes定义了getRowtimeAttributeDescriptors方法,返回的是RowtimeAttributeDescriptor的List,RowtimeAttributeDescriptor有3个属性,分别是attributeName、timestampExtractor及watermarkStrategy

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Flink SQL 如何定义时间属性
本文将解释如何在 Flink 的 Table API 和 SQL 中为基于时间的操作定义时间属性。
smartsi
2021/10/08
2K0
聊聊flink Table Schema的定义
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/StreamTableEnvironment.scala
code4it
2019/02/03
3.4K0
聊聊flink Table Schema的定义
重要|Flink SQL与kafka整合的那些事儿
flink与kafka整合是很常见的一种实时处理场景,尤其是kafka 0.11版本以后生产者支持了事务,使得flink与kafka整合能实现完整的端到端的仅一次处理,虽然这样会有checkpoint周期的数据延迟,但是这个仅一次处理也是很诱人的。可见的端到端的使用案例估计就是前段时间oppo的案例分享吧。关注浪尖微信公众号(bigdatatip)输入 oppo 即可获得。
Spark学习技巧
2019/06/03
3.3K0
聊聊flink的Table API及SQL Programs
序 本文主要研究一下flink的Table API及SQL Programs flink-forward-sf-2017-timo-walther-table-sql-api-unified-apis-for-batch-and-stream-processing-8-638.jpg 实例 // for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment StreamExecutionEnvironm
code4it
2019/01/21
2.1K0
聊聊flink的Table API及SQL Programs
聊聊flink Table的Group Windows
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala
code4it
2019/01/26
1.9K0
聊聊flink Table的Group Windows
Flink DataStream API与Data Table API/SQL集成
在定义数据处理管道时,Table API 和 DataStream API 同样重要。
从大数据到人工智能
2022/02/24
4.4K0
2021年大数据Flink(三十六):​​​​​​​Table与SQL ​​​​​​案例三
使用Flink SQL来统计5秒内 每个用户的 订单总数、订单的最大金额、订单的最小金额
Lansonli
2021/10/11
4370
flink sql使用中的一个问题
最近有人问了浪尖一个flink共享datastream或者临时表会否重复计算的问题。
Spark学习技巧
2019/12/15
1.8K0
Apache-Flink深度解析-Temporal-Table-JOIN
在《JOIN LATERAL》中提到了Temporal Table JOIN,本篇就向大家详细介绍什么是Temporal Table JOIN。 在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这个标准。Temporal Table记录了历史上任何时间点所有的数据改动,Temporal Table的工作流程如下:
王知无-import_bigdata
2019/04/24
4.5K0
Apache-Flink深度解析-Temporal-Table-JOIN
Flink SQL 知其所以然(二十二):SQL 的时间语义!(建议收藏)
讲到这里,xdm 会问,博主上面写的 3 种时间属性到底对我们的任务有啥影响呢?3 种时间属性的应用场景是啥?
公众号:大数据羊说
2022/05/17
1.2K0
Flink SQL 知其所以然(二十二):SQL 的时间语义!(建议收藏)
Flink学习笔记(9)-Table API 和 Flink SQL
• Table API 是一套内嵌在 Java 和 Scala 语言中的查询API,它允许以非常直观的方式组合来自一些关系运算符的查询
挽风
2022/05/11
2.3K0
Flink学习笔记(9)-Table API 和 Flink SQL
聊聊flink Table的Distinct Aggregation
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/functions/AggregateFunction.scala
code4it
2019/01/28
1.5K0
聊聊flink Table的Distinct Aggregation
Flink重点难点:Flink Table&SQL必知必会(二)
介绍了 Flink Table & SQL的一些核心概念,本部分将介绍 Flink 中窗口和函数。
王知无-import_bigdata
2021/09/22
2.1K0
Apache-Flink深度解析-SQL概览
SQL是Structured Query Language的缩写,最初是由美国计算机科学家Donald D. Chamberlin和Raymond F. Boyce在20世纪70年代早期从 Early History of SQL 中了解关系模型后在IBM开发的。该版本最初称为[SEQUEL: A Structured English Query Language](结构化英语查询语言),旨在操纵和检索存储在IBM原始准关系数据库管理系统System R中的数据。SEQUEL后来改为SQL,因为“SEQUEL”是英国Hawker Siddeley飞机公司的商标。我们看看这款用于特技飞行的英国皇家空军豪客Siddeley Hawk T.1A (Looks great):
王知无-import_bigdata
2019/03/26
7840
Apache-Flink深度解析-TableAPI
在《SQL概览》中我们概要的向大家介绍了什么是好SQL,SQL和Table API是Apache Flink中的同一层次的API抽象,如下图所示
王知无-import_bigdata
2019/04/24
7170
Apache-Flink深度解析-TableAPI
聊聊flink的CsvTableSource
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/TableSource.scala
code4it
2019/02/05
1.4K0
聊聊flink的CsvTableSource
一篇文章带你深入理解FlinkSQL中的窗口
时间语义,要配合窗口操作才能发挥作用。最主要的用途,当然就是开窗口、根据时间段做计算了。下面我们就来看看 Table API 和 SQL 中,怎么利用时间字段做窗口操作。在 Table API 和 SQL 中,主要有两种窗口:Group Windows 和 Over Windows(时间语义的文章推荐)
大数据老哥
2021/02/04
2K0
一篇文章带你深入理解FlinkSQL中的窗口
Apache-Flink深度解析-TableAPI
SQL和Table API是Apache Flink中的同一层次的API抽象,如下图所示:
王知无-import_bigdata
2019/03/26
1.3K0
Flink开发-Mysql数据导入Hive中
Mysql中ResultSet默认会将一次查询的结果存入内存中。如果数据量比较大,就会占用大量的内存。如果内存不够,就会报错。
码客说
2023/03/06
2K0
14-Flink-Table-&-SQL实战
Apache Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet API。您可以轻松地在基于API构建的所有API和库之间切换。例如,您可以使用CEP库从DataStream中提取模式,然后使用Table API分析模式,或者可以在预处理上运行Gelly图算法之前使用SQL查询扫描,过滤和聚合批处理表数据。
王知无-import_bigdata
2019/03/04
1.3K0
14-Flink-Table-&-SQL实战
相关推荐
Flink SQL 如何定义时间属性
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验