今日推荐:案例分享 | 大数据传输中的二进制加密方案
文章链接:https://cloud.tencent.com/developer/article/2465951
推荐原因:数据加密作为一种有效的保护手段,已经被广泛应用于各种数据传输场景中。在大数据时代,各种平台存储了大量的行为数据和用户信息,为了保证用户的隐私,数据安全作为数据治理的一部分,也被越来越多的人所提及。如何确保数据在传输过程中的机密性、成为了需要开发者需要考虑的难题。
之前写过很多Spark和PySpark的项目和技术操作文章,主流框架基本就是Spark了,但是在最近很多大数据的朋友反应除了公司自研大数据平台部署Spark进行大数据计算之外,还有相当一部分公司采用了大数据托管方式依托云平台管理。
很多第三方平台都有自己的大数据工具以及代码工具库,因此本系列内容就是主要写PyODPS这个目前算是主流常用的大数据类PySpark库,主要依托于阿里云的DataWorks,可以直接在大数据开发MaxCompute使用PyODPS,十分方便数据挖掘。因此本系列将延展MaxComputer来进行一系列数据挖掘项目,有需求的不要错过。
PyODPS是MaxCompute的Python版本的SDK,类似于Spark的PySpark。提供简单方便的Python编程,PyODPS提供了与ODPS命令行工具类似的功能,例如上传和下载文件、创建表、运行ODPS SQL查询等,同时提供了一些高级功能,如提交MapReduce任务、使用ODPS UDF等。
Python作为目前机器学习、AI模型开发的主流编程语言,提供了如NumPy、SciPy、Scikit-Learn、Matplotlib等丰富的科学计算、可视化库,用于数据科学和数据分析。同时支持TensorFlow、PyTorch、XGBoost、LightGBM等丰富的训练框架。
PyODPS于2015年正式发布,作为MaxCompute的Python SDK,支持通过Python接口对MaxCompute数据进行相关操作。经过多个版本的迭代发展,目前PyODPS已支持DataFrame框架,同时提供类似Pandas的语法,内置聚合、排序、去重等数据操作算子
PyODPS作为一个SDK,本身运行于各种客户端,包括PC、DataWorks(数据开发的PyODPS节点)或PAI Notebooks的运行环境。
和PySpark一样,如果只在本地单点执行,比如初始使用PyODPS的用户会试图把数据拉取到本地,处理完成后再上传到 MaxCompute上,很多时候这种方式是十分低效的,拉取数据到本地彻底丧失了MaxCompute的大规模并行能力的优势,也就是没有用到大数据集群的并行计算能力。按照官方的定义:
数据处理方式 | 描述 | 场景示例 |
---|---|---|
拉取到本地处理(不推荐,易OOM) | 例如DataWorks中的PyODPS节点,内置了PyODPS包以及必要的Python环境,是一个资源非常受限的客户端运行容器,并不使用MaxCompute计算资源,有较强的内存限制。 | PyODPS提供了 |
提交到MaxCompute分布式执行(推荐) | 推荐您合理利用PyODPS提供的分布式DataFrame功能,将主要的计算提交到MaxCompute分布式执行而不是在PyODPS客户端节点下载处理,这是正确使用PyODPS的关键。 | 推荐使用PyODPS DataFrame接口来完成数据处理。常见的需求,比如需要对每一行数据处理然后写回表,或者一行数据要拆成多行,都可以通过PyODPS DataFrame中的 |
用户需要通过分析每天产生的日志字符串来提取一些信息,有一个只有一列的表,类型是string,通过jieba分词可以将中文语句分词,然后再找到想要的关键词存储到信息表里。
低效处理代码demo:
import jieba
t = o.get_table('word_split')
out = []
with t.open_reader() as reader:
for r in reader:
words = list(jieba.cut(r[0]))
#
# 处理逻辑,产生出 processed_data
#
out.append(processed_data)
out_t = o.get_table('words')
with out_t.open_writer() as writer:
writer.write(out)
单机处理数据的思维,逐行读取数据,然后逐行处理数据,再逐行写入目标表。整个流程中,下载上传数据消耗了大量的时间,并且在执行脚本的机器上需要很大的内存处理所有的数据,特别是对于使用DataWorks节点的用户来说,很容易因为超过默认分配的内存值而导致OOM运行报错。
高效处理代码demo:
from odps.df import output
out_table = o.get_table('words')
df = o.get_table('word_split').to_df()
# 假定需要返回的字段及类型如下
out_names = ["word", "count"]
out_types = ["string", "int"]
@output(out_names, out_types)
def handle(row):
import jieba
words = list(jieba.cut(row[0]))
#
# 处理逻辑,产生出 processed_data
#
yield processed_data
df.apply(handle, axis=1).persist(out_table.name)
利用apply实现分布式执行:
jieba
),所以无需担心代码改动带来的成本,您可以几乎不需要改动主要逻辑就可以享受到MaxCompute的大规模计算能力。本系列文章均通过DataWorks使用PyODPS,本地使用代码效果和文章代码一致。可以进入DataWorks的数据开发页面创建PyODPS节点。PyODPS节点分为PyODPS 2和PyODPS 3两种:
可根据实际使用的Python语言版本创建PyODPS节点:
DataWorks的PyODPS节点中,options.tunnel.use_instance_tunnel默认设置为False,即默认情况下,最多读取一万条数据记录。如果需要读取更多数据记录,需全局开启instance tunnel,即需要手动将options.tunnel.use_instance_tunnel设置为True。
DataWorks的PyODPS节点中,将会包含一个全局变量odps
或者o
,即为ODPS入口。不需要手动定义ODPS入口,比如:
#查看表pyodps_iris是否存在
print(o.exist_table('pyodps_iris'))
让我们把目光聚焦于如何完成ODPS的一系列表操作上面,也就是了解如何使用达成最终效果的工具。
o.list_tables()
方法可以列出项目空间下的所有表:
list_tables(project=None, prefix=None, owner=None, schema=None, type=None, extended=False)
参数:
返回:
tables in this project, filtered by the optional prefix and owner.
返回类型:
generator
for table in o.list_tables():
print(table)
输出的信息包含表名<库名>.<表名>、类型<type>和表schema
可以通过prefix
参数只列举给定前缀的表:
for table in o.list_tables(prefix="table_prefix"):
print(table.name)
通过该方法获取的 Table 对象不会自动加载表名以外的属性,如果需要在列举表的同时读取这些属性,在 PyODPS 0.11.5 及后续版本中,可以为list_tables
添加extended=True
参数:
for table in o.list_tables(extended=True):
print(table.name, table.creation_time,table.schema)
如果需要按类型列举表,可以指定type
参数。不同类型的表列举方法如下:
managed_tables = list(o.list_tables(type="managed_table")) # 列举内置表
external_tables = list(o.list_tables(type="external_table")) # 列举外表
virtual_views = list(o.list_tables(type="virtual_view")) # 列举视图
materialized_views = list(o.list_tables(type="materialized_view")) # 列举物化视图
o.exist_table()
方法可以判断表是否存在。
print(o.exist_table('pyodps_iris'))
# 返回True表示表pyodps_iris存在。
入口对象的o.get_table()
方法可以获取表。
get_table(name, project=None, schema=None)
t = o.get_table('products')
print(t.schema) # 获取表pyodps_iris的schema
t = o.get_table('products')
print(t.schema.columns) # table
t = o.get_table('products')
print(t.schema['category']) # 获取表products的sepallength列信息
t = o.get_table('products')
print(t.schema['category'].comment) # 获取表products的category列的备注信息
t = o.get_table('products')
print(t.lifecycle) # 获取表pyodps_iris的生命周期
-1代表永久存在
t = o.get_table('products')
print(t.creation_time) # 获取表pyodps_iris的创建时间
t = o.get_table('products')
print(t.is_virtual_view) # 获取表products是否是虚拟视图,返回False,表示不是。
t = o.get_table('table_name', project='other_project')
其中other_project为所跨的项目,table_name为跨项目获取的表名称。
from odps.models import Schema, Column, Partition
columns = [
Column(name='num', type='bigint', comment='the column'),
Column(name='num2', type='double', comment='the column2'),
]
partitions = [Partition(name='pt', type='string', comment='the partition')]
schema = Schema(columns=columns, partitions=partitions)
初始化后,您可获取字段信息、分区信息等。
print(schema.columns)
print(schema.names)
print(schema.types)
使用Schema.from_lists()
方法。该方法更容易调用,但无法直接设置列和分区的注释。
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
print(schema.columns)
可以使用o.create_table()
方法创建表,使用方式有两种:使用表Schema方式、使用字段名和字段类型方式。同时创建表时表字段的数据类型有一定的限制条件,详情如下
使用表Schema创建表时,您需要先创建表的Schema,然后通过Schema创建表。
#创建表的schema
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
#通过schema创建表
table = o.create_table('my_new_table', schema)
#只有不存在表时,才创建表。
table = o.create_table('my_new_table', schema, if_not_exists=True)
#设置生命周期。
table = o.create_table('my_new_table', schema, lifecycle=7)
表创建完成后,您可以通过print(o.exist_table('my_new_table'))
验证表是否创建成功,返回True
表示表创建成功。
#创建分区表my_new_table,可传入(表字段列表,分区字段列表)。
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)
#创建非分区表my_new_table02。
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)
可通过exist_table来判定:
print(o.exist_table('my_new_table'))
未打开新数据类型开关时(默认关闭),创建表的数据类型只允许为BIGINT、DOUBLE、DECIMAL、STRING、DATETIME、BOOLEAN、MAP和ARRAY类型。如果需要创建TINYINT和STRUCT等新数据类型字段的表,可以打开options.sql.use_odps2_extension = True
开关,示例如下。
from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')
当一个表被其他程序更新,例如改变了Schema,可以调用reload()
方法同步表的更新。
#表schema变更
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
#通过reload()同步表更新
table = o.create_table('my_new_table', schema)
table.reload()
write_table()
方法写入数据。records = [[111, 1.0], # 此处可以是list。
[222, 2.0],
[333, 3.0],
[444, 4.0]]
o.write_table('my_new_table', records, partition='pt=test', create_partition=True) #创建pt=test分区并写入数据
write_table()
方法,MaxCompute都会在服务端生成一个文件。该操作耗时较长,同时文件过多会降低后续的查询效率。因此建议在使用此方法时,一次性写入多组数据,或者传入一个生成器对象。write_table()
方法向表中写入数据时会追加到原有数据中。PyODPS不提供覆盖数据的选项,如果需要覆盖数据,请手动清除原有数据。对于非分区表,需要调用table.truncate()
方法;对于分区表,需要删除分区后再建立新的分区。对表对象调用open_writer()
方法写入数据。
t = o.get_table('my_new_table')
with t.open_writer(partition='pt=test02', create_partition=True) as writer: #创建pt=test02分区并写入数据
records = [[1, 1.0], # 此处可以是List。
[2, 2.0],
[3, 3.0],
[4, 4.0]]
writer.write(records) # 这里Records可以是可迭代对象。
如果是多级分区表,写入示例如下。
t = o.get_table('test_table')
with t.open_writer(partition='pt1=test1,pt2=test2') as writer: # 多级分区写法。
records = [t.new_record([111, 'aaa', True]), # 也可以是Record对象。
t.new_record([222, 'bbb', False]),
t.new_record([333, 'ccc', True]),
t.new_record([444, '中文', False])]
writer.write(records)
每个进程写数据时共享同一个Session_ID,但是有不同的Block_ID。每个Block对应服务端的一个文件。主进程执行Commit,完成数据上传。
import random
from multiprocessing import Pool
from odps.tunnel import TableTunnel
def write_records(tunnel, table, session_id, block_id):
# 对使用指定的ID创建Session。
local_session = tunnel.create_upload_session(table.name, upload_id=session_id)
# 创建Writer时指定Block_ID。
with local_session.open_record_writer(block_id) as writer:
for i in range(5):
# 生成数据并写入对应Block。
record = table.new_record([random.randint(1, 100), random.random()])
writer.write(record)
if __name__ == '__main__':
N_WORKERS = 3
table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
tunnel = TableTunnel(o)
upload_session = tunnel.create_upload_session(table.name)
# 每个进程使用同一个Session_ID。
session_id = upload_session.id
pool = Pool(processes=N_WORKERS)
futures = []
block_ids = []
for i in range(N_WORKERS):
futures.append(pool.apply_async(write_records, (tunnel, table, session_id, i)))
block_ids.append(i)
[f.get() for f in futures]
# 最后执行Commit,并指定所有Block。
upload_session.commit(block_ids)
Record表示表的一行记录,对表对象调用new_record()
方法即可创建一个新的Record。
t = o.get_table('test_table')
r = t.new_record(['val0', 'val1']) # 值的个数必须等于表Schema的字段数。
r2 = t.new_record() # 可以不传入值。
r2[0] = 'val0' # 通过偏移设置值。
r2['field1'] = 'val1' # 通过字段名设置值。
r2.field1 = 'val1' # 通过属性设置值。
print(record[0]) # 取第0个位置的值。
print(record['c_double_a']) # 通过字段取值。
print(record.c_double_a) # 通过属性取值。
print(record[0: 3]) # 切片操作。
print(record[0, 2, 3]) # 取多个位置的值。
print(record['c_int_a', 'c_double_a']) # 通过多个字段取值。
获取表数据的方法有多种,常用方法如下:
read_table()
方法。# 处理一条记录。
for record in o.read_table('my_new_table', partition='pt=test'):
print(record)
如果仅需要查看每个表最开始的小于1万条数据,可以对表对象调用head()
方法。
t = o.get_table('my_new_table')
# 处理每个Record对象。
for record in t.head(3):
print(record)
调用open_reader()
方法读取数据。
with
表达式的写法如下。from odps.models import Schema
t = o.get_table('my_new_table')
with t.open_reader(partition='pt=test') as reader:
count = reader.count
for record in reader: # 可以执行多次,直到将Count数量的Record读完,此处可以改造成并行操作。
print(record) # 处理一条记录,例如打印记录本身
不使用with
表达式的写法如下
reader = t.open_reader(partition='pt=test')
count = reader.count
for record in reader: # 可以执行多次,直到将Count数量的Record读完,此处可以改造成并行操作。
print(record) # 处理一条记录,例如打印记录本身
使用delete_table()
方法删除已经存在的表。
o.delete_table('my_table_name', if_exists=True) # 只有表存在时,才删除表。
t.drop() # Table对象存在时,直接调用Drop方法删除。
#创建表的schema
from odps.models import Schema
table = o.get_table('my_new_table')
if table.schema.partitions:
print('Table %s is partitioned.' % table.name)
from odps.models import Schema
table = o.get_table('my_new_table')
for partition in table.partitions: # 遍历所有分区
print(partition.name) # 具体的遍历步骤,这里是打印分区名
for partition in table.iterate_partitions(spec='pt=test'): # 遍历 pt=test 分区下的二级分区
print(partition.name) # 具体的遍历步骤,这里是打印分区名
for partition in table.iterate_partitions(spec='dt>20230119'): # 遍历 dt>20230119 分区下的二级分区
print(partition.name) # 具体的遍历步骤,这里是打印分区名
table = o.get_table('my_new_table')
table.exist_partition('pt=test,sub=2015')
#创建表的schema
from odps.models import Schema
table = o.get_table('my_new_table')
partition = table.get_partition('pt=test01')
print(partition.creation_time)
print(partition.size)
t = o.get_table('my_new_table')
t.delete_partition('pt=test', if_exists=True) # 自定if_exists参数,分区存在时才删除分区。
partition.drop() # 分区对象存在时,直接对分区对象调用Drop方法删除。
Tunnel是MaxCompute的数据通道,用户可以通过Tunnel向MaxCompute中上传或者下载数据。
from odps.tunnel import TableTunnel
table = o.get_table('my_table')
tunnel = TableTunnel(odps)
upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
with upload_session.open_record_writer(0) as writer:
record = table.new_record()
record[0] = 'test1'
record[1] = 'id1'
writer.write(record)
record = table.new_record(['test2', 'id2'])
writer.write(record)
# 需要在 with 代码块外 commit,否则数据未写入即 commit,会导致报错
upload_session.commit([0])
from odps.tunnel import TableTunnel
tunnel = TableTunnel(odps)
download_session = tunnel.create_download_session('my_table', partition_spec='pt=test')
# 处理每条记录。
with download_session.open_record_reader(0, download_session.count) as reader:
for record in reader:
print(record) # 具体的遍历步骤,这里是打印记录对象
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。