首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用Python从逐行读取巨大的JSON

用Python从逐行读取巨大的JSON
EN

Stack Overflow用户
提问于 2018-10-09 10:24:46
回答 4查看 5.4K关注 0票数 3

我知道我应该有个密码,但我还没有什么有用的东西。

我的GCS 上有~300 GCS文件,最终我试图将它导入BigQuery,但是它有一些错误的数据结构(我是mongoexport从MongoDB获得的)

字段名"$date“无效。字段必须仅包含字母、数字和下划线,以字母或下划线开头,最长为128个字符。

因此,现在我的方法是从GCS逐行读取源文件,并使用python将每一行处理后的代码上传到BigQuery。

下面是简单的阅读器,我把它们放在一起用原始大文件中的100行示例进行测试:

代码语言:javascript
复制
import json
from pprint import pprint

with open('schema_in_10.json') as f:
    for line in f:
        j_content = json.loads(line)

        # print(j_content['id'], j_content['city'], j_content['country'], j_content['state'], j_content['country_code'], j_content['smart_location'], j_content['address'], j_content['market'], j_content['neighborhood'])
        # // geo { lat, lng}'])
        print('------')
        pprint(j_content['is_location_exact'])
        pprint(j_content['zipcode'])
        pprint(j_content['name'])

能否请您帮助我如何从GoogleCloudStoragewithPython3逐行读取或流一个巨大的JSON?

EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2018-10-09 11:11:40

逐行读取它,然后尝试流到BigQuery,不会在本地机器上按300 TBH进行缩放,您将很难获得这个工作TBH。

有几个可伸缩的选项:

  1. 编写一个管道,从GCS读取您的文件(它将为您缩放并并行读取),更正字段名,然后写入BigQuery。见这里
  2. 直接将它加载到BigQuery中,使用CSV而不是JSON作为格式,并使用没有出现在数据中的分隔符。这将将每个记录加载到单个字符串列中,然后可以使用BigQuery的JSON函数提取所需的内容。见这里
票数 3
EN

Stack Overflow用户

发布于 2020-01-27 18:06:38

打开现在支持流式GCS文件。

代码语言:javascript
复制
from smart_open import open

# stream from GCS
with open('gs://my_bucket/my_file.txt') as fin:
    for line in fin:
        print(line)

# stream content *into* GCS (write mode):
with open('gs://my_bucket/my_file.txt', 'wb') as fout:
    fout.write(b'hello world')
票数 4
EN

Stack Overflow用户

发布于 2018-10-09 14:17:06

下面是GCP数据流中的一个解决方案的示例实现,该解决方案对应于接受答案中的第一个建议。您需要在函数json_processor.中实现json校正您可以在Datalab笔记本中运行此代码。

代码语言:javascript
复制
# Datalab might need an older version of pip
# !pip install pip==9.0.3

import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions

project_id = 'my-project'
bigquery_dataset_name = 'testdataset' # needs to exist 
table_name = 'testtable'
bucket_name = 'my-bucket'
json_file_gcs_path = 'gs://path/to/my/file.json'
schema = "name:STRING,zipcode:STRING"

def json_processor(row):
    import json
    d = json.loads(row)
    return {'name': d['name'], 'zipcode': d['zipcode']}

options = beam.options.pipeline_options.PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.job_name = "myjob"
google_cloud_options.staging_location = 'gs://{}/binaries'.format(bucket_name)
google_cloud_options.temp_location = 'gs://{}/temp'.format(bucket_name)
options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options.region = "europe-west1"

p = beam.Pipeline(options=options)

(p | "read_from_gcs" >> beam.io.ReadFromText(json_file_gcs_path)
   | "json_processor" >> beam.Map(json_processor)
   | "write_to_bq" >> beam.io.Write(beam.io.gcp.bigquery.BigQuerySink(table=table_name, 
                                                       dataset=bigquery_dataset_name, 
                                                       project=project_id, 
                                                       schema=schema, 
                                                       create_disposition='CREATE_IF_NEEDED',
                                                       write_disposition='WRITE_EMPTY'))
)

p.run()
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52718752

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档