基础概念:
示例代码:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
# 定义Pipeline选项
options = PipelineOptions()
# 创建Pipeline
p = beam.Pipeline(options=options)
# 定义数据处理逻辑
(p
| 'ReadData' >> beam.io.ReadFromText('input.json')
| 'ProcessData' >> beam.Map(lambda x: json.loads(x))
| 'WriteToBigQuery' >> WriteToBigQuery(
table='your_project_id:your_dataset.your_table',
schema='field1:STRING,field2:INTEGER',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
# 运行Pipeline
result = p.run()
result.wait_until_finish()
参考链接:
领取专属 10元无门槛券
手把手带您无忧上云