Python 访问

最近更新时间:2024-10-30 16:22:32

我的收藏
数据湖计算 DLC 提供符合 DBAPI 2.0 标准的工具。您可以通过 Python 连接到 DLC 的 Presto/Spark 引擎,可以方便地使用 SQL 的方式操作 DLC 库表。

环境准备

1. Python 3.9及以上版本。
2. 安装tencentcloud-dlc-connector。
pip install -i https://mirrors.tencent.com/pypi/simple/ tencentcloud-dlc-connector

使用示例

步骤一:连接到引擎

代码:
import tdlc_connector
import datetime
from tdlc_connector import constants

conn = tdlc_connector.connect(region="<REGION>",
secret_id="<SECRET_ID>",
secret_key="<SECRET_KEY>",
token=None,
endpoint=None,
catalog=constants.Catalog.DATALAKECATALOG,
engine="<ENGINE>",
resource_group=None,
engine_type=constants.EngineType.AUTO,
result_style=constants.ResultStyles.LIST,
download=False,
mode=constants.Mode.LASY,
database='',
config={},
callback=None,
callback_events=None,
)
参数说明:
参数
说明
region
引擎所在地域,如 ap-nanjing,ap-beijing,ap-guangzhou,ap-shanghai, ap-chengdu,ap-chongqing,na-siliconvalley,ap-singapore, ap-hongkong
secret_id
腾讯云 SecretID
secret_key
腾讯云 SecretKey
token
(可选)临时密钥 Token
endpoint
(可选)连接服务节点
engine
使用的引擎名称,例如 “test_python”
resource_group
(可选)标准引擎资源组名称,为空时使用默认资源组(如需使用该特性,请将 connector 升级至 >= 1.2.1)
engine_type
(可选)引擎类型:对应引擎名称的引擎类型(标准引擎可以通过该字段指定执行 SQL 任务或交互式 SQL 任务,指定为constants.EngineType.SPARK_BATCH,用于提交交互式 SQL,指定为 constants.EngineType.AUTO 或者其他,用于提交 SQL任务),默认值 constants.EngineType.AUTO
例如:AUTO、PRESTO、SPARK、SPARK_BATCH
result_style
(可选)返回结果的格式,可选 LIST/DICT
download
(可选)是否直接下载数据 True/False,详见下载模式说明
mode
(可选)模式。支持 ALL/LASY/STREAM
database
(可选)默认数据库
config
(可选)提交到集群配置
callback
(可选)回调函数,函数签名 def cb(statement_id, status)
callback_events
(可选)回调触发事件,同callback配合使用,详见回调机制说明
driver_size
(可选)Driver 节点大小,默认值 constants.PodSize.SMALL (仅SPARK_BATCH 集群有效)
可选值:SMALL、MEDIUM、LARGE、XLARGE、M_SMALL、M_MEDIUM、M_LARGE、M_XLARGE
executor_size
(可选)Executor 节点大小,默认值 constants.PodSize.SMALL (仅SPARK_BATCH 集群有效) 可选值:SMALL、MEDIUM、LARGE、XLARGE、M_SMALL、M_MEDIUM、M_LARGE、M_XLARGE
executor_num
(可选)Executor 节点数量, 默认值1 (仅SPARK_BATCH 集群有效)
executor_max_num
(可选)Executor 最大节点数量,若不等于 executor_num, 则开启资源动态分配 (仅 SPARK_BATCH 集群有效)

下载模式说明
序号
download
mode
说明
1
False
ALL
从接口获取所有数据,获取完成后才能 fetch 到数据
2
False
LASY
从接口获取数据,根据 fetch 的数据量延迟到服务端获取数据
3
False
STREAM
同 LASY 模式
4
True
ALL
从 COS 下载所有结果(需具备 COS 读权限)占用本地临时存储,数据量较大时推荐使用
5
True
LASY
从 COS 下载结果(需具备 COS 读权限),根据 fetch 数据量延迟下载文件
6
True
STREAM
实时的从 COS 读取结果流(需具备 COS 读权限),性能较慢,本地内存磁盘占用率极低

步骤二:执行 SQL

代码:
# 基本操作

cursor = conn.cursor()
count = cursor.execute("SELECT 1")
print(cursor.fetchone()) # 读取一行数据
for row in cursor.fetchall(): # 读取剩余多行数据
print(row)

# 使用参数 pyformat 格式

cursor.execute("SELECT * FROM dummy WHERE date < %s", datetime.datetime.now())
cursor.execute("SELECT * FROM dummy WHERE status in %s", (('SUCCESS', 'INIT', 'FAIL'),))
cursor.execute("SELECT * FROM dummy WHERE date < %(date)s AND status = %(status)s", {'date': datetime.datetime.now(), 'status': 'SUCCESS'})

# 使用BULK方式

cursor.executemany("INSERT INTO dummy VALUES(%s, %s)", [('张三', 18), ('李四', 20)])

基本操作流程

上述代码流程如下:
1. 通过conn.cursor()创建了一个游标对象。
2. 通过cursor.execute("SELECT 1")执行了一条 SQL 查询语句,并将结果赋值给变量count。
3. 通过调用cursor.fetchone()方法读取了一行数据,并将其打印出来。
4. 在一个循环中调用了cursor.fetchall()方法来读取剩余的多行数据,并逐行打印出来。

参数传递方式

支持两种不同的参数传递方式:
使用 pyformat 格式:在执行 SQL 语句时,可以使用%s作为占位符,并将实际参数以元组或字典形式传入。
使用 BULK 方式:通过调用executemany()方法可以批量插入多个记录到数据库表中。

特性功能

回调机制使用说明

import tdlc_connector
import datetime
from tdlc_connector import constants


def tdlc_connector_callback(statement_id, state):
'''
parmas: statement_id 任务id
params: state 任务状态,枚举值 参考constants.TaskStatus
'''
print(statement_id, state)


conn = tdlc_connector.connect(region="<REGION>",
secret_id="<SECRET_ID>",
secret_key="<SECRET_KEY>",
engine="<ENGINE>",
engine_type=constants.EngineType.SPARK,
result_style=constants.ResultStyles.LIST,
callback=tdlc_connector_callback,
callback_events=[constants.CallbackEvent.ON_INIT, constants.CallbackEvent.ON_SUCCESS]
)

cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.fetchone()

# callback 函数会在任务初始化 和 任务成功时调用


提交任务到作业集群

当前已支持提交任务到 Spark 作业集群,具体请参考如下示例。
from tdlc_connector import constants

conn = tdlc_connector.connect(region="<REGION>",
secret_id="<SECRET_ID>",
secret_key="<SECRET_KEY>",
engine="<ENGINE>", # 请选择 spark 作业引擎
result_style=constants.ResultStyles.LIST,
driver_size=constants.PodSize.SMALL, # 选择 Driver 规格
executor_size=constants.PodSize.SMALL, # 选择 Executor 规格
executor_num=1, # 设置 Executor 数量
executor_max_num=1, # 设置 Executor 最大数量, 如果 不等于{executor_num},则开启动态资源分配
)
说明:
如需使用该特性,请将 connector 升级至 >= 1.1.0。

自动推断引擎类型

当前指定使用引擎后无需再指定引擎类型,connector 会自动推断,具体请参考如下示例。
from tdlc_connector import constants

conn = tdlc_connector.connect(region="<REGION>",
secret_id="<SECRET_ID>",
secret_key="<SECRET_KEY>",
engine="<ENGINE>",
engine_type=constants.EngineType.AUTO # 可设置成AUTO 或者 不传入该参数,驱动自动推断
)
说明:
如需使用该特性,请将 connector 升级至 >= 1.1.0。

空值转换

当前结果集基于CSV格式存储,引擎默认将空值转换成空字符串,如果需要区分空值,请指定空值表示符, 例如 “\\1” ,引擎查询结果会将空值转换成 “\\1”,同时驱动会将 “\\1”字段转换成 None,具体请参考如下示例。
from tdlc_connector import constants, formats

formats.FORMAT_STRING_NULL = '\\1'

conn = tdlc_connector.connect(region="<REGION>",
secret_id="<SECRET_ID>",
secret_key="<SECRET_KEY>",
engine="<ENGINE>",
result_style=constants.ResultStyles.LIST
)
说明:
空值转换目前仅支持 SparkSQL集群,请将 connector 升级至 >= 1.1.3。