我知道我应该有个密码,但我还没有什么有用的东西。
我的GCS 上有~300 GCS文件,最终我试图将它导入BigQuery,但是它有一些错误的数据结构(我是mongoexport从MongoDB获得的)
字段名"$date“无效。字段必须仅包含字母、数字和下划线,以字母或下划线开头,最长为128个字符。
因此,现在我的方法是从GCS逐行读取源文件,并使用python将每一行处理后的代码上传到BigQuery。
下面是简单的阅读器,我把它们放在一起用原始大文件中的100行示例进行测试:
import json
from pprint import pprint
with open('schema_in_10.json') as f:
for line in f:
j_content = json.loads(line)
# print(j_content['id'], j_content['city'], j_content['country'], j_content['state'], j_content['country_code'], j_content['smart_location'], j_content['address'], j_content['market'], j_content['neighborhood'])
# // geo { lat, lng}'])
print('------')
pprint(j_content['is_location_exact'])
pprint(j_content['zipcode'])
pprint(j_content['name'])能否请您帮助我如何从GoogleCloudStoragewithPython3逐行读取或流一个巨大的JSON?
发布于 2018-10-09 11:11:40
发布于 2020-01-27 18:06:38
打开现在支持流式GCS文件。
from smart_open import open
# stream from GCS
with open('gs://my_bucket/my_file.txt') as fin:
for line in fin:
print(line)
# stream content *into* GCS (write mode):
with open('gs://my_bucket/my_file.txt', 'wb') as fout:
fout.write(b'hello world')发布于 2018-10-09 14:17:06
下面是GCP数据流中的一个解决方案的示例实现,该解决方案对应于接受答案中的第一个建议。您需要在函数json_processor.中实现json校正您可以在Datalab笔记本中运行此代码。
# Datalab might need an older version of pip
# !pip install pip==9.0.3
import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
project_id = 'my-project'
bigquery_dataset_name = 'testdataset' # needs to exist
table_name = 'testtable'
bucket_name = 'my-bucket'
json_file_gcs_path = 'gs://path/to/my/file.json'
schema = "name:STRING,zipcode:STRING"
def json_processor(row):
import json
d = json.loads(row)
return {'name': d['name'], 'zipcode': d['zipcode']}
options = beam.options.pipeline_options.PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.job_name = "myjob"
google_cloud_options.staging_location = 'gs://{}/binaries'.format(bucket_name)
google_cloud_options.temp_location = 'gs://{}/temp'.format(bucket_name)
options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options.region = "europe-west1"
p = beam.Pipeline(options=options)
(p | "read_from_gcs" >> beam.io.ReadFromText(json_file_gcs_path)
| "json_processor" >> beam.Map(json_processor)
| "write_to_bq" >> beam.io.Write(beam.io.gcp.bigquery.BigQuerySink(table=table_name,
dataset=bigquery_dataset_name,
project=project_id,
schema=schema,
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_EMPTY'))
)
p.run()https://stackoverflow.com/questions/52718752
复制相似问题