Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >(2)FlinkSQL滚动窗口demo演示

(2)FlinkSQL滚动窗口demo演示

原创
作者头像
NBI大数据
发布于 2022-08-08 03:09:17
发布于 2022-08-08 03:09:17
46900
代码可运行
举报
运行总次数:0
代码可运行

滚动窗口(Tumbling Windows) 滚动窗口有固定的大小,是一种对数据进行均匀切片的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。

demo演示:

场景:接收通过socket发送过来的数据,每30秒触发一次窗口计算逻辑

(1)准备一个实体对象,消息对象

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.pojo;

import java.io.Serializable;

/**
 * Created by lj on 2022-07-05.
 */
public class WaterSensor implements Serializable {
    private String id;
    private long ts;
    private int vc;

    public WaterSensor(){

    }

    public WaterSensor(String id,long ts,int vc){
        this.id = id;
        this.ts = ts;
        this.vc = vc;
    }

    public int getVc() {
        return vc;
    }

    public void setVc(int vc) {
        this.vc = vc;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public long getTs() {
        return ts;
    }

    public void setTs(long ts) {
        this.ts = ts;
    }
}

(2)编写socket代码,模拟数据发送

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.producers;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;

/**
 * Created by lj on 2022-07-05.
 */
public class Socket_Producer {
    public static void main(String[] args) throws IOException {

        try {
            ServerSocket ss = new ServerSocket(9999);
            System.out.println("启动 server ....");
            Socket s = ss.accept();
            BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
            String response = "java,1,2";

            //每 2s 发送一次消息
            int i = 0;
            Random r=new Random();   
            String[] lang = {"flink","spark","hadoop","hive","hbase","impala","presto","superset","nbi"};

            while(true){
                Thread.sleep(2000);
                response= lang[r.nextInt(lang.length)] + "," + i + "," + i+"\n";
                System.out.println(response);
                try{
                    bw.write(response);
                    bw.flush();
                    i++;
                }catch (Exception ex){
                    System.out.println(ex.getMessage());
                }

            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

(3)从socket端接收数据,并设置30秒触发执行一次窗口运算

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package com.examples;

import com.pojo.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;

/**
 * Created by lj on 2022-07-06.
 *
 * 滚动窗口(Tumbling Windows) 滚动窗口有固定的大小,是一种对数据进行均匀切片的划分方式。窗口之间没有重叠,也不会有间隔,
 * 是“首尾相接”的状态。滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,
 * 就是窗口的大小(window size)。
 */
public class Flink_Group_Window_Tumble {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 9999,"\n");
        SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String s) throws Exception {
                String[] split = s.split(",");
                return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
            }
        });

        // 将流转化为表
        Table table = tableEnv.fromDataStream(waterDS,
                $("id"),
                $("ts"),
                $("vc"),
                $("pt").proctime());

        tableEnv.createTemporaryView("EventTable", table);

        Table result = tableEnv.sqlQuery(
                "SELECT " +
                        "id, " +                //window_start, window_end,
                        "COUNT(ts) ,SUM(ts)" +
                        "FROM TABLE( " +
                        "TUMBLE( TABLE EventTable , " +
                        "DESCRIPTOR(pt), " +
                        "INTERVAL '30' SECOND)) " +
                        "GROUP BY id , window_start, window_end"
        );

//        tableEnv.toChangelogStream(result).print("count");
//        tableEnv.toDataStream(result).print("toDataStream");
//        tableEnv.toAppendStream(result, Row.class).print("toAppendStream");           //追加模式
        tableEnv.toRetractStream(result, Row.class).print("toRetractStream");       //缩进模式
        env.execute();
    }
}

(4)效果演示

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
(3)FlinkSQL滑动窗口demo演示
滑动窗口(Sliding Windows)与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个滑动步长(window slide),代表窗口计算的频率。
NBI大数据
2022/08/08
4370
(3)FlinkSQL滑动窗口demo演示
(4)FlinkSQL将socket数据写入到mysql方式一
本章节主要演示从socket接收数据,通过滚动窗口每30秒运算一次窗口数据,然后将结果写入Mysql数据库
NBI大数据
2022/08/08
1.1K0
(4)FlinkSQL将socket数据写入到mysql方式一
(6)FlinkSQL将kafka数据写入到mysql方式一
图片这里不展开zookeeper、kafka安装配置(1)首先需要启动zookeeper和kafka图片(2)定义一个kafka生产者package com.producers;import com.alibaba.fastjson.JSONObject;import com.pojo.Event;import com.pojo.WaterSensor;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka
NBI大数据
2022/08/08
1.1K0
(6)FlinkSQL将kafka数据写入到mysql方式一
(8)FlinkSQL自定义UDF
Flink提供了自定义函数的基础能力,在需要满足特殊业务场景需求时,根据自身需要按需定制自己的UDF 下面将简单演示一个UDF的定义和UDF的使用过程:
NBI大数据
2022/08/08
5220
(8)FlinkSQL自定义UDF
一篇文章带你深入理解FlinkSQL中的窗口
时间语义,要配合窗口操作才能发挥作用。最主要的用途,当然就是开窗口、根据时间段做计算了。下面我们就来看看 Table API 和 SQL 中,怎么利用时间字段做窗口操作。在 Table API 和 SQL 中,主要有两种窗口:Group Windows 和 Over Windows(时间语义的文章推荐)
大数据老哥
2021/02/04
2K0
一篇文章带你深入理解FlinkSQL中的窗口
Flink重点难点:Flink Table&SQL必知必会(二)
介绍了 Flink Table & SQL的一些核心概念,本部分将介绍 Flink 中窗口和函数。
王知无-import_bigdata
2021/09/22
2.2K0
(7)FlinkSQL将kafka数据写入到mysql方式二
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.c
NBI大数据
2022/08/08
1.4K0
(7)FlinkSQL将kafka数据写入到mysql方式二
(5)FlinkSQL将socket数据写入到mysql方式二
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.c
NBI大数据
2022/08/08
1K0
(5)FlinkSQL将socket数据写入到mysql方式二
(6)Flink CEP SQL模拟账号短时间内异地登录风控预警
(1)通过将xxx平台用户登录时的登录日志发送到kafka(本文代码演示用的socket);
NBI大数据
2022/08/30
6710
(6)Flink CEP SQL模拟账号短时间内异地登录风控预警
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(三)SQL篇
哈喽各位,本章主要写的是FlinkSQL也是Flink章节的倒数第二篇了,最后还有一篇FlinkCEP,稍后会出,耐心关注哦!好了,进入正题!!!!
857技术社区
2023/02/23
3.7K0
Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(三)SQL篇
(2)sparkstreaming滚动窗口和滑动窗口演示
一、滚动窗口(Tumbling Windows) 滚动窗口有固定的大小,是一种对数据进行均匀切片的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。
NBI大数据
2022/09/05
1.3K0
(2)sparkstreaming滚动窗口和滑动窗口演示
(上)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
Apache Flink 提供了两种关系型 API 用于统一流和批处理,Table 和 SQL API。
公众号:大数据羊说
2022/04/04
3.5K0
(上)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)
Flink 的三种WordCount(文末领取Flink书籍)
今天是 Flink 从 0 到 1 系列的第 2 篇:《WordCount及FlinkSQL》。
Python编程爱好者
2022/09/21
1.1K0
Flink 的三种WordCount(文末领取Flink书籍)
Flink重点难点:维表关联理论和Join实战
数据流操作的另一个常见需求是对两条数据流中的事件进行联结(connect)或Join。Flink DataStream API中内置有两个可以根据时间条件对数据流进行Join的算子:基于间隔的Join和基于窗口的Join。本节我们会对它们进行介绍。
王知无-import_bigdata
2021/09/22
4.8K0
Flink DataStream API与Data Table API/SQL集成
在定义数据处理管道时,Table API 和 DataStream API 同样重要。
从大数据到人工智能
2022/02/24
4.5K0
全网最详细4W字Flink全面解析与实践(下)
Flink是一个有状态的流式计算引擎,所以会将中间计算结果(状态)进行保存,默认保存到TaskManager的堆内存中。
BookSea
2023/11/02
1.1K0
全网最详细4W字Flink全面解析与实践(下)
flink sql 知其所以然(十九):Table 与 DataStream 的转转转(附源码)
废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:
公众号:大数据羊说
2022/04/04
3K0
flink sql 知其所以然(十九):Table 与 DataStream 的转转转(附源码)
全网最详细4W字Flink入门笔记(下)
在Spark中有DataFrame这样的关系型编程接口,因其强大且灵活的表达能力,能够让用户通过非常丰富的接口对数据进行处理,有效降低了用户的使用成本。Flink也提供了关系型编程接口Table API以及基于Table API的SQL API,让用户能够通过使用结构化编程接口高效地构建Flink应用。同时Table API以及SQL能够统一处理批量和实时计算业务,无须切换修改任何应用代码就能够基于同一套API编写流式应用和批量应用,从而达到真正意义的批流统一
BookSea
2023/07/21
5920
Flink kafka sink to RDBS 测试Demo
表的输出,是通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口,可以 支持不同的文件格式、存储数据库和消息队列。
小石头
2022/11/10
1.3K0
全网最详细4W字Flink入门笔记(下)
Flink是一个有状态的流式计算引擎,所以会将中间计算结果(状态)进行保存,默认保存到TaskManager的堆内存中,但是当task挂掉,那么这个task所对应的状态都会被清空,造成了数据丢失,无法保证结果的正确性,哪怕想要得到正确结果,所有数据都要重新计算一遍,效率很低。想要保证 At -least-once 和 Exactly-once,需要把数据状态持久化到更安全的存储介质中,Flink提供了堆内内存、堆外内存、HDFS、RocksDB等存储介质。
BookSea
2023/10/16
1.1K0
全网最详细4W字Flink入门笔记(下)
推荐阅读
相关推荐
(3)FlinkSQL滑动窗口demo演示
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档