在数据流管道中遇到AttributeError: 'str' object has no attribute 'items'
错误通常是因为在处理数据时,某个步骤期望一个字典对象,但实际接收到的是一个字符串。这种情况在从PubSub读取数据并写入BigQuery时尤为常见,因为PubSub消息通常是JSON格式的字符串。
AttributeError: 'str' object has no attribute 'items'
错误通常是因为在处理PubSub消息时,某个函数或方法期望一个字典对象,但实际接收到的是一个JSON格式的字符串。
以下是一个示例代码,展示了如何在数据流管道中修复这个问题:
import json
from apache_beam import Pipeline, ReadFromPubSub, WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
class ParseMessage(beam.DoFn):
def process(self, element):
try:
# 将JSON字符串解析为字典对象
record = json.loads(element.decode('utf-8'))
yield record
except json.JSONDecodeError as e:
# 处理解析失败的情况
print(f"Failed to parse JSON: {e}")
yield None
def run():
options = PipelineOptions()
with Pipeline(options=options) as p:
(p
| 'Read from PubSub' >> ReadFromPubSub(topic='your-pubsub-topic')
| 'Parse JSON' >> beam.ParDo(ParseMessage())
| 'Filter valid records' >> beam.Filter(lambda x: x is not None)
| 'Write to BigQuery' >> WriteToBigQuery(
table='your-project-id:your_dataset.your_table',
schema='your_schema',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED))
if __name__ == '__main__':
run()
通过这种方式,可以有效避免AttributeError: 'str' object has no attribute 'items'
错误,并确保数据流管道的稳定运行。
领取专属 10元无门槛券
手把手带您无忧上云