前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Flink 实践教程:进阶10-自定义聚合函数(UDAF)

Flink 实践教程:进阶10-自定义聚合函数(UDAF)

原创
作者头像
吴云涛
修改于 2022-03-15 07:28:03
修改于 2022-03-15 07:28:03
1.5K2
举报

流计算 Oceanus 简介

流计算 Oceanus大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将为您详细介绍如何使用自定义聚合函数(UDAF),将处理后的存入 MySQL 中。

前置准备

创建流计算 Oceanus 集群

在流计算 Oceanus 产品活动页面 1 元购买 Oceanus 集群。进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。

创建 MySQL 实例

进入 MySQL 控制台 [3],点击【新建】。具体可参考官方文档 创建 MySQL 实例 [4]。进入实例后,单击右上角【登陆】即可登陆 MySQL 数据库

创建 MySQL 表
代码语言:sql
AI代码解释
复制
-- 建表语句,用于向 Source 提供数据
CREATE TABLE `udaf_input` (
  `id`       int(10) NOT NULL,
  `product`  varchar(50) DEFAULT '',
  `value`    int(10) DEFAULT NULL,
  `weight`   int(10) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
​
-- 插入数据
INSERT INTO `udaf_input` (`id`, `product`, `value`, `weight`) VALUES (1, 'oceanus-1', 2, 2);
INSERT INTO `udaf_input` (`id`, `product`, `value`, `weight`) VALUES (2, 'oceanus-1', 3, 3);
INSERT INTO `udaf_input` (`id`, `product`, `value`, `weight`) VALUES (3, 'oceanus-2', 5, 4);
INSERT INTO `udaf_input` (`id`, `product`, `value`, `weight`) VALUES (5, 'oceanus-2', 6, 5);-- 建表语句,用于接收 Sink 端数据
CREATE TABLE `udaf_output` (
  `product`  varchar(50) NOT NULL DEFAULT '',
  `sum`      double(11,0) DEFAULT NULL,
  PRIMARY KEY (`product`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

开发 UDTF

我们自定义一个 UDFA,继承 AggregateFunction,对算子输入的两个字段计算加权平均值。

1. 代码编写

WeightedAvgAccumulator类:

代码语言:java
AI代码解释
复制
package demos.UDAF;public class WeightedAvgAccumulator{
    public long sum = 0;
    public int count = 0;
}

WeightedAvg 类:

代码语言:java
AI代码解释
复制
package demos.UDAF;import org.apache.flink.table.functions.AggregateFunction;public class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccumulator> {@Override
    public WeightedAvgAccumulator createAccumulator() {
        return new WeightedAvgAccumulator();
    }@Override
    public Long getValue(WeightedAvgAccumulator acc) {
        if (acc.count == 0) {
            return null;
        } else {
            return acc.sum / acc.count;
        }
    }public void accumulate(WeightedAvgAccumulator acc, Long iValue, Integer iWeight) {
        acc.sum += iValue * iWeight;
        acc.count += iWeight;
    }public void retract(WeightedAvgAccumulator acc, Long iValue, Integer iWeight) {
        acc.sum -= iValue * iWeight;
        acc.count -= iWeight;
    }public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> it) {
        for (WeightedAvgAccumulator a : it) {
            acc.count += a.count;
            acc.sum += a.sum;
        }
    }public void resetAccumulator(WeightedAvgAccumulator acc) {
        acc.count = 0;
        acc.sum = 0L;
    }
}
2. 打包 Jar

使用 IDEA 自带打包工具 Build Artifacts 或者命令行进行打包。 命令行打包命令:

代码语言:shell
AI代码解释
复制
mvn clean package

命令行打包后生成的 Jar 包可以在项目 target 目录下找到。

流计算 Oceanus 作业

上传依赖

在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传本地 Jar 包。

创建 SQL 作业

在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 SQL 作业,点击【开发调试】进入作业编辑页面。 单击【作业参数】,在【引用程序包】处选择刚才上传的 Jar 包。

1. 创建 Function
代码语言:sql
AI代码解释
复制
CREATE TEMPORARY SYSTEM FUNCTION WeightedAvg  AS 'demos.UDAF.WeightedAvg';

WeightedAvg代表创建的函数名,demos.UDAF.WeightedAvg代表代码所在路径。

2. 创建 Source
代码语言:sql
AI代码解释
复制
CREATE TABLE `mysql_cdc_source_table` (
  `id`        INT,
  `product`   VARCHAR,
  `value`     INT,
  `weight`    INT,
  PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
  'connector' = 'mysql-cdc',      -- 固定值 'mysql-cdc'
  'hostname' = 'xx.xx.xx.xx',      -- 数据库的 IP
  'port' = 'xxxx',                -- 数据库的访问端口
  'username' = 'root',            -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)
  'password' = 'xxxxxxxxx',       -- 数据库访问的密码
  'database-name' = 'testdb',     -- 需要同步的数据库
  'table-name' = 'udaf_input'     -- 需要同步的数据表名
);
3. 创建 Sink
代码语言:sql
AI代码解释
复制
CREATE TABLE `jdbc_source_table` (
    `product`  VARCHAR,
    `sum`      DOUBLE,
    PRIMARY KEY(`product`) NOT ENFORCED
) WITH (
    -- 指定数据库连接参数
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://xx.xx.xx.xx:xxxx/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',   -- 请替换为您的实际 MySQL 连接参数
    'table-name' = 'udaf_output',    -- 需要写入的数据表
    'username' = 'root',             -- 数据库访问的用户名(需要提供 INSERT 权限)
    'password' = 'xxxxxxxxx',        -- 数据库访问的密码
    'sink.buffer-flush.max-rows' = '200',  -- 批量输出的条数
    'sink.buffer-flush.interval' = '2s'    -- 批量输出的间隔
);
4. 编写业务 SQL
代码语言:sql
AI代码解释
复制
INSERT INTO jdbc_source_table
SELECT
product,CAST(WeightedAvg(`value`,`weight`) AS DOUBLE) AS `sum`
FROM mysql_cdc_source_table GROUP BY `product`;

总结

本文首先在本地开发 UDAF 函数,将其打成 Jar 包后上传到 Oceanus 平台引用。接下来使用 MySQL CDC 连接器获取udaf_input表数据,调用 UDAF 函数对输入的两个字段计算加权平均值后存入 MySQL 中。 其他的自定义函数,例如自定义标量函数(UDF)和自定义表值函数(UDTF)的使用方法和视频教程可以参考之前的文章 Flink 实践教程:进阶8-自定义标量函数(UDF [5]、Flink 实践教程:进阶9-自定义表值函数(UDTF) [6]

  • 自定义聚合函数(UDAF)可以将多条记录聚合成 1 条记录。

参考链接

[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview

[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298

[3] MySQL 控制台:https://console.cloud.tencent.com/cdb

[4] 创建 MySQL 实例:https://cloud.tencent.com/document/product/236/46433

[5] Flink 实践教程:进阶8-自定义标量函数(UDF):https://cloud.tencent.com/developer/article/1946320

[6] Flink 实践教程:进阶9-自定义表值函数(UDTF):https://cloud.tencent.com/developer/article/1951900

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
2 条评论
热度
最新
2
2
回复回复点赞举报
1
1
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
SQL索引失效原因分析与解决方案
原因: 该查询中使用了 customer_id 列,但如果没有为该列建立索引,数据库可能会选择进行全表扫描,而不是利用索引进行快速查询。
GeekLiHua
2025/01/21
1020
【收藏】MySQL 超全优化清单(可执行系列)
先从一般的语句优化开始,其实对于很多规范大家并不陌生,可就是在用的时候,无法遵从,希望今天大家再过一遍,可以养成一种良好的数据库编码习惯。
lyb-geek
2024/07/17
2700
【收藏】MySQL 超全优化清单(可执行系列)
日常开发常见MySQL性能优化策略及应用场景
在电子商务平台的日常运营中,经常需要根据用户的订单状态和日期进行查询。随着订单量的增加,查询响应时间变长,影响报表生成和订单处理效率。
GeekLiHua
2024/08/19
2400
日常开发常见MySQL性能优化策略及应用场景
PostgreSQL亿级行数据处理
使用Timescale压缩和分块跳过索引,实现PostgreSQL处理数十亿行数据的方法。
云云众生s
2025/01/12
1780
PostgreSQL亿级行数据处理
五大SQL优化技巧,助你轻松提升数据库查询效率
提升SQL生产力是数据库管理和优化的关键。以下是五个关键技巧,每个技巧都配有具体应用场景、案例代码以及使用前后的性能对比。
老表
2024/07/11
5350
五大SQL优化技巧,助你轻松提升数据库查询效率
MySQL索引优化分析「建议收藏」
为什么你写的sql查询慢?为什么你建的索引常失效?通过本章内容,你将学会MySQL性能下降的原因,索引的简介,索引创建的原则,explain命令的使用,以及explain输出字段的意义。助你了解索引,分析索引,使用索引,从而写出更高性能的sql语句。还在等啥子?撸起袖子就是干!
全栈程序员站长
2022/07/12
8230
MySQL索引优化分析「建议收藏」
深入解析MySQL索引:本质、分类、选择及使用原则
索引,作为数据库中的一种核心数据结构,其本质在于通过改变数据结构来加快查询效率。可以将索引理解为数据库中的一种“目录”或“路标”,它帮助数据库系统快速定位到需要查询的数据行,从而大大提高数据检索的速度。索引的本质就是一张特殊的表,前面是索引的关键字,后面是这个关键字存放的地址。当数据量庞大时,查找索引比查找全部内容要快得多,而且索引表数据量小,非常节省计算机资源。
小马哥学JAVA
2025/02/20
2420
join查询可以⽆限叠加吗?MySQL对join查询有什么限制吗?
假设有一个复杂的业务系统,涉及到用户表(users)、订单表(orders)、商品表(products)、物流表(logistics)和支付表(payments)。如果编写如下的 JOIN 查询:
威哥爱编程
2025/02/24
2800
MySQL关于子查询经典面试题
面试官:“在MySQL中,进行多表查询时,你认为子查询(Subquery)和Join哪个效率更高?请详细说明你的理由,并提供一些具体的代码案例来支持你的观点。”
小白的大数据之旅
2025/01/24
890
MySQL关于子查询经典面试题
LeetCode 数据库专题
写一段SQL查询来展示每位用户的 唯一标识码(unique ID );如果某位员工没有唯一标识码,使用 null 填充即可。你可以以 任意 顺序返回结果表。查询结果的格式如下例所示:
wywwzjj
2023/05/09
1.5K0
LeetCode 数据库专题
SQL优化策略与实践:组合索引与最左前缀原则详解
SQL优化的方式有很多,它们可以帮助提高数据库查询的效率,减少资源的消耗。以下是一些常见的SQL优化方式:
阿珍
2025/04/21
1620
SQL优化策略与实践:组合索引与最左前缀原则详解
MySQL 分表查询
分表是一种数据库分割技术,用于将大表拆分成多个小表,以提高数据库的性能和可管理性。在MySQL中,可以使用多种方法进行分表,例如基于范围、哈希或列表等。下面将详细介绍MySQL如何分表以及分表后如何进行数据查询。
孟斯特
2023/10/19
1.9K0
MySQL 分表查询
【数据库设计和SQL基础语法】--连接与联接--多表查询与子查询基础(二)
子查询是指在一个查询语句内部嵌套另一个查询语句的过程。子查询可以嵌套在 SELECT、FROM、WHERE 或 HAVING 子句中,用于从数据库中检索数据或执行其他操作。子查询通常返回一个结果集,该结果集可以被包含它的主查询使用。 以下是子查询的一般概述:
喵叔
2023/12/21
5500
猫头虎分享:PostgreSQL 中分区表 PARTITION BY RANGE 的使用详解与数据迁移,索引创建细节详解
数据库作为现代技术的核心,如何高效地管理海量数据一直是技术团队关注的焦点。在 PostgreSQL 中,分区表(Partitioned Tables)为我们提供了极大的灵活性,尤其是通过 PARTITION BY RANGE 可以轻松实现按日期分区,大幅度提升查询性能和数据管理效率。今天,我们猫头虎技术团队将为大家详细剖析如何使用 PostgreSQL 的分区表及其背后的数据迁移与索引创建细节。
猫头虎
2024/09/17
3910
深入解析MySQL的EXPLAIN:指标详解与索引优化
MySQL 中的 EXPLAIN 语句是一个强大的工具,用于分析和优化 SQL 查询。通过 EXPLAIN,你可以了解 MySQL 查询优化器是如何执行你的查询的,以及是否有可以改进的地方。本文将详细讲解 EXPLAIN 输出的各项指标,并说明如何利用这些指标来优化索引结构和 SQL 语句。
每周聚焦
2024/12/17
2660
深入解析MySQL的EXPLAIN:指标详解与索引优化
【详解】Hive怎样写exist/in子句
在大数据处理领域,Hive 是一个广泛使用的数据仓库工具,它允许用户通过类似于 SQL 的查询语言来操作存储在 Hadoop 分布式文件系统中的数据。本文将探讨如何在 Hive 中使用 ​​EXISTS​​ 和 ​​IN​​ 子句进行数据查询,这两种方法是 SQL 中常见的用于检查子查询结果是否存在的条件表达式。
大盘鸡拌面
2025/01/17
1700
【Java 进阶篇】MySQL多表查询之外连接详解
在MySQL数据库中,多表查询是一种常见且强大的功能,允许您在多个表之间执行联接操作,从而检索、过滤和组合数据。在本篇博客中,我们将深入探讨多表查询的一种类型,即外连接(Outer Join),并详细介绍其语法、用途和示例。无论您是刚开始学习数据库还是想深入了解MySQL的查询功能,本文都将为您提供有价值的信息。
繁依Fanyi
2023/10/12
8170
【Java 进阶篇】MySQL多表查询之外连接详解
数据表索引应用之覆盖索引
覆盖索引是数据库索引的一种类型,它存储了执行查询所需的所有数据。因此,在索引覆盖的查询方式下,查询过程可以完全依赖索引,无需对数据表进行额外查询。
参谋带个长
2024/07/18
1730
MySQL不使用子查询的原因
这些案例展示了如何通过不同优化策略提升MySQL查询性能,特别是在处理子查询时。以下是一些额外的优化建议:
用户11397231
2025/01/24
2490
MySQL不使用子查询的原因
MySQL - 索引详解
索引依托于存储引擎的实现,因此,每种存储引擎的索引都不一定完全相同,并且每种存储引擎也不一定支持所有索引类型。所有存储引擎支持每个表至少16个索引,总索引长度至少为256字节。大多数存储引擎有更高的额限制。
battcn
2018/08/03
9780
MySQL - 索引详解
相关推荐
SQL索引失效原因分析与解决方案
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档