环境准备
1. Python 3.9及以上版本。
2. 安装
tencentcloud-dlc-connector。
pip install -i https://mirrors.tencent.com/pypi/simple/ tencentcloud-dlc-connector
使用示例
步骤一:连接到引擎
代码:
import tdlc_connectorimport datetimefrom tdlc_connector import constantsconn = tdlc_connector.connect(region="<REGION>",secret_id="<SECRET_ID>",secret_key="<SECRET_KEY>",token=None,endpoint=None,catalog=constants.Catalog.DATALAKECATALOG,engine="<ENGINE>",resource_group=None,engine_type=constants.EngineType.AUTO,result_style=constants.ResultStyles.LIST,download=False,mode=constants.Mode.LASY,database='',config={},callback=None,callback_events=None,)
参数说明:
参数 | 说明 |
region | 引擎所在地域,如 ap-nanjing,ap-beijing,ap-guangzhou,ap-shanghai, ap-chengdu,ap-chongqing,na-siliconvalley,ap-singapore, ap-hongkong |
secret_id | 腾讯云 SecretID |
secret_key | 腾讯云 SecretKey |
token | (可选)临时密钥 Token |
endpoint | (可选)连接服务节点 |
engine | 使用的引擎名称,例如 “test_python” |
resource_group | (可选)标准引擎资源组名称,为空时使用默认资源组(如需使用该特性,请将 connector 升级至 >= 1.2.1) |
engine_type | (可选)引擎类型:对应引擎名称的引擎类型(标准引擎可以通过该字段指定执行 SQL 任务或交互式 SQL 任务,指定为constants.EngineType.SPARK_BATCH,用于提交交互式 SQL,指定为 constants.EngineType.AUTO 或者其他,用于提交 SQL任务),默认值 constants.EngineType.AUTO 例如:AUTO、PRESTO、SPARK、SPARK_BATCH |
result_style | (可选)返回结果的格式,可选 LIST/DICT |
download | |
mode | (可选)模式。支持 ALL/LASY/STREAM |
database | (可选)默认数据库 |
config | (可选)提交到集群配置 |
callback | (可选)回调函数,函数签名 def cb(statement_id, status) |
callback_events | (可选)回调触发事件,同callback配合使用,详见回调机制说明 |
driver_size | (可选)Driver 节点大小,默认值 constants.PodSize.SMALL (仅SPARK_BATCH 集群有效) 可选值:SMALL、MEDIUM、LARGE、XLARGE、M_SMALL、M_MEDIUM、M_LARGE、M_XLARGE |
executor_size | (可选)Executor 节点大小,默认值 constants.PodSize.SMALL (仅SPARK_BATCH 集群有效)
可选值:SMALL、MEDIUM、LARGE、XLARGE、M_SMALL、M_MEDIUM、M_LARGE、M_XLARGE |
executor_num | (可选)Executor 节点数量, 默认值1 (仅SPARK_BATCH 集群有效) |
executor_max_num | (可选)Executor 最大节点数量,若不等于 executor_num, 则开启资源动态分配 (仅 SPARK_BATCH 集群有效) |
下载模式说明
:步骤二:执行 SQL
代码:
# 基本操作cursor = conn.cursor()count = cursor.execute("SELECT 1")print(cursor.fetchone()) # 读取一行数据for row in cursor.fetchall(): # 读取剩余多行数据print(row)# 使用参数 pyformat 格式cursor.execute("SELECT * FROM dummy WHERE date < %s", datetime.datetime.now())cursor.execute("SELECT * FROM dummy WHERE status in %s", (('SUCCESS', 'INIT', 'FAIL'),))cursor.execute("SELECT * FROM dummy WHERE date < %(date)s AND status = %(status)s", {'date': datetime.datetime.now(), 'status': 'SUCCESS'})# 使用BULK方式cursor.executemany("INSERT INTO dummy VALUES(%s, %s)", [('张三', 18), ('李四', 20)])
基本操作流程
上述代码流程如下:
1. 通过
conn.cursor()
创建了一个游标对象。2. 通过
cursor.execute("SELECT 1")
执行了一条 SQL 查询语句,并将结果赋值给变量count。
3. 通过调用
cursor.fetchone()
方法读取了一行数据,并将其打印出来。4. 在一个循环中调用了
cursor.fetchall()
方法来读取剩余的多行数据,并逐行打印出来。参数传递方式
支持两种不同的参数传递方式:
使用 pyformat 格式:在执行 SQL 语句时,可以使用%s作为占位符,并将实际参数以元组或字典形式传入。
使用 BULK 方式:通过调用
executemany()
方法可以批量插入多个记录到数据库表中。特性功能
回调机制使用说明
import tdlc_connectorimport datetimefrom tdlc_connector import constantsdef tdlc_connector_callback(statement_id, state):'''parmas: statement_id 任务idparams: state 任务状态,枚举值 参考constants.TaskStatus'''print(statement_id, state)conn = tdlc_connector.connect(region="<REGION>",secret_id="<SECRET_ID>",secret_key="<SECRET_KEY>",engine="<ENGINE>",engine_type=constants.EngineType.SPARK,result_style=constants.ResultStyles.LIST,callback=tdlc_connector_callback,callback_events=[constants.CallbackEvent.ON_INIT, constants.CallbackEvent.ON_SUCCESS])cursor = conn.cursor()cursor.execute("SELECT 1")cursor.fetchone()# callback 函数会在任务初始化 和 任务成功时调用
提交任务到作业集群
当前已支持提交任务到 Spark 作业集群,具体请参考如下示例。
from tdlc_connector import constantsconn = tdlc_connector.connect(region="<REGION>",secret_id="<SECRET_ID>",secret_key="<SECRET_KEY>",engine="<ENGINE>", # 请选择 spark 作业引擎result_style=constants.ResultStyles.LIST,driver_size=constants.PodSize.SMALL, # 选择 Driver 规格executor_size=constants.PodSize.SMALL, # 选择 Executor 规格executor_num=1, # 设置 Executor 数量executor_max_num=1, # 设置 Executor 最大数量, 如果 不等于{executor_num},则开启动态资源分配)
说明:
如需使用该特性,请将 connector 升级至 >= 1.1.0。
自动推断引擎类型
当前指定使用引擎后无需再指定引擎类型,connector 会自动推断,具体请参考如下示例。
from tdlc_connector import constantsconn = tdlc_connector.connect(region="<REGION>",secret_id="<SECRET_ID>",secret_key="<SECRET_KEY>",engine="<ENGINE>",engine_type=constants.EngineType.AUTO # 可设置成AUTO 或者 不传入该参数,驱动自动推断)
说明:
如需使用该特性,请将 connector 升级至 >= 1.1.0。
空值转换
当前结果集基于CSV格式存储,引擎默认将空值转换成空字符串,如果需要区分空值,请指定空值表示符, 例如 “\\1” ,引擎查询结果会将空值转换成 “\\1”,同时驱动会将 “\\1”字段转换成 None,具体请参考如下示例。
from tdlc_connector import constants, formatsformats.FORMAT_STRING_NULL = '\\1'conn = tdlc_connector.connect(region="<REGION>",secret_id="<SECRET_ID>",secret_key="<SECRET_KEY>",engine="<ENGINE>",result_style=constants.ResultStyles.LIST)
说明:
空值转换目前仅支持 SparkSQL集群,请将 connector 升级至 >= 1.1.3。