我有一个光束管道在用数据流流道运行。它接受XML并输出JSON,然后将其存储在Bigquery表中。早些时候,我使用beam管道将换行符分隔的JSON写入GCS bucket,并从该文件创建一个BQ表,而无需对其进行任何更改(使用bigquery控制台)。作业成功运行,数据导入到BQ中,没有任何问题。
现在我已经修改了管道,以便将输出的JSON行直接写入BQ表。我正在使用apache的beam.io.WriteToBigQuery函数。集合是json对象,其中每一行都包含BQ的一个单独的对象(行)。
下面是进入WriteToBigQuery的示例输入:
{"order_no": "1111", "order_gross_price": "74.66", "order_tax": "0.00", "order_date": "2015-10-03T23:58:15.000Z", "shipping_net_price": "5.00", "merch_net_price": "69.66", "browser_id": "Mozilla"}
{"order_no": "2222", "order_gross_price": "27.82", "order_tax": "2.12", "order_date": "2015-10-04T00:04:20.000Z", "shipping_net_price": "5.00", "merch_net_price": "20.70", "browser_id": "Mozilla"}
我的部分代码如下所示:
from apache_beam.io.gcp import bigquery
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
def run(argv = None):
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'project_name'
google_cloud_options.job_name = 'jobid'
google_cloud_options.staging_location = 'gs://bucket/staging'
google_cloud_options.temp_location = 'gs://bucket/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'
p = beam.Pipeline(options=options)
table_spec = 'project:dataset.table'
data = (p
| 'Create' >> beam.Create([input_file_path])
| 'GetXML' >> beam.ParDo(ReadGCSfile())
#| 'Convert2JSON' >> beam.ParDo(converttojson())
| 'COvert2json' >> beam.Map(lambda orders: json.dumps(orders))
#| beam.Map(print_row)
)
project_id = "project1"
dataset_id = 'dataset'
table_id = 'table'
table_schema = ('browser_id:STRING, merch_net_price:FLOAT, order_no:INTEGER, order_tax:FLOAT, shipping_net_price:FLOAT, order_gross_price:FLOAT, order_date:TIMESTAMP')
data| 'write' >> beam.io.WriteToBigQuery(table = table_id,dataset=dataset_id,project=project_id,schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
p.run()
运行此管道时出现的错误如下:
AttributeError: 'list' object has no attribute 'items' [while running 'write/StreamInsertRows/ParDo(BigQueryWriteFn)']
我认为这个错误是由于上一步的返回类型造成的,或者是与执行约束和批量加载到BigQuery有关。我想在mycase中进行批量加载。我试着使用Apache BEam documentation-Writing to a bigquery table给出的insert pipeline示例来说明管道工作。其中的数据格式如下:
quotes = p | beam.Create([
{'source': 'Mahatma Gandhi', 'quote': 'My life is my message.'},
{'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'."},
])
如何修改我的管道,以便将我的示例中的字符串类型数据写入bigquery表。
发布于 2019-11-19 17:41:21
如果有人遇到同样的问题,就把它贴在这里。这是我忽略的一个非常微小的细节。beam.io.WriteToBigquery()接受字典作为输入。在接收器部分之前,我的pcollection返回单个元素或字符串的列表(取决于我尝试过的一些版本)。我刚刚在管道中添加了另一个步骤,使用json.loads将json字符串转换为python字典,然后将行成功加载到BQ中。
https://stackoverflow.com/questions/58824297
复制相似问题