UDF 说明
用户可通过编写 UDF 函数,打包为 JAR 文件后,在数据湖计算定义为函数在查询分析中使用。目前数据湖计算 DLC 的 UDF 为 HIVE 格式,继承 org.apache.hadoop.hive.ql.exec.UDF,实现 evaluate 方法。
示例:简单数组 UDF 函数。
public class MyDiff extends UDF {public ArrayList<Integer> evaluate(ArrayList<Integer> input) {ArrayList<Integer> result = new ArrayList<Integer>();result.add(0, 0);for (int i = 1; i < input.size(); i++) {result.add(i, input.get(i) - input.get(i - 1));}return result;}}
pom 文件参考:
<dependencies><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.16</version><scope>test</scope></dependency><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>1.2.1</version></dependency></dependencies>
创建函数
注意:
如您创建的是 udaf/udtf 函数,需要在函数名相应加上 _udaf/_udtf 后缀。
1. 登录 数据湖计算控制台 ,选择服务地域。
2. 通过左侧导航菜单进入数据管理,选择需要创建的函数的数据库,如果需要创建新的数据库,可参见 数据库管理。
3. 单击函数进入函数管理页面。
4. 单击创建函数进行创建。
UDF 的程序包支持本地上传或选择 COS 路径(需具备 COS 相关权限),示例为选择 COS 路径创建。
函数类名包含“包信息”及“函数的执行类名”。
函数使用
1. 登录 数据湖计算控制台,选择服务地域。
2. 通过左侧导航菜单进入数据探索,选择计算引擎后即可使用 SQL 调用函数(注意:请选择独享引擎,共享引擎暂不支持自定义 udf 函数使用)。
Python UDF 开发
1. 购买集群,请选择 SuperSQL-S 1.0 版本(暂支持该版本)集群。
2. 开启集群 Python UDF 功能
默认情况下,Python UDF 功能未开,请在 SuperSQL 引擎页面,选择需要开启的引擎,单击参数配置,加入配置。输入
spark.sql.pyudf.enabled = true
后,单击确认,进行保存。SparkSQL 集群待集群重启后生效。
SparkBatch 集群下一个任务生效。
3. 注册 Python UDF 函数
DLC 支持完全社区兼容的 UDF 函数。
示例,请编辑如下文件
demo.py
:def name_sensitivity(name):if len(name) > 2:return name[0] + '*' * (len(name) - 2) + name[-1]elif len(name) == 2:return '*' + name[-1]else:return name
目前支持单 Python 文件。
只支持 import 系统自带模块。
请注意函数输入、输出类型。
请注意处理函数异常。
4. 在数据探索界面,使用开启了支持 Python UDF 的集群,并设置 Session 参数
eos.sql.processType=DIRECT
如果您不能编辑该参数,请 提交工单 联系我们。
如果您需要编辑变更 Python UDF 函数,SparkSQL 集群加载机制约有30s延迟,SparkBatch 集群下一个任务立即生效。
如果您需要删除当前 Python UDF 函数,SparkSQL 集群需要重启集群,SparkBatch 集群下一个任务立即生效。