首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >在上传到BigQuery之前对GCS文件进行去Denormalize

在上传到BigQuery之前对GCS文件进行去Denormalize
EN

Stack Overflow用户
提问于 2020-01-07 22:35:44
回答 1查看 187关注 0票数 0

我已经在.Net核心中编写了一个Cloud,它从GCS位置读取文件,然后被认为是去denormalize (即为每一行添加更多信息以包含文本描述),然后将其写入BigQuery表。我有两个选择:

  1. 我的云运行API可以创建非规范化的CSV文件,并将它们写入另一个GCS位置。然后,另一个云运行API可以获取那些非规范化的CSV文件,并将它们直接写入BigQuery。
  2. 我的云运行API可以读取原始的CSV文件,在内存( filestream )中对它们进行去定向,然后以某种方式从内存中的filestream直接写入BigQuery表。

如果性能(速度)和成本(金钱)是我的目标,那么在这个场景中写BigQuery的最好方法是什么。这些文件在去甲基化前每个大约有10 are。每一行大约有1000个字符。去甲基化后,大约是原来的三倍。在成功加载到BigQuery中之后,我不需要保留非规范化文件。我关心的是性能,以及关于插入/写入的任何特定的BigQuery每日配额。我不认为有任何,除非您正在做DML语句,但纠正我,如果我错了。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-01-31 12:05:38

我会使用云函数,当您将文件上传到桶中时会触发这些函数。

它是如此普遍,谷歌有一个存储库,这是一个教程,专门为JSON文件使用云函数将云存储数据流到BigQuery中

然后,我将从以下位置修改示例main.py文件:

代码语言:javascript
运行
复制
def streaming(data, context):
    '''This function is executed whenever a file is added to Cloud Storage'''
    bucket_name = data['bucket']
    file_name = data['name']
    db_ref = DB.document(u'streaming_files/%s' % file_name)
    if _was_already_ingested(db_ref):
        _handle_duplication(db_ref)
    else:
        try:
            _insert_into_bigquery(bucket_name, file_name)
            _handle_success(db_ref)
        except Exception:
            _handle_error(db_ref)

对于这个接受CSV文件的

代码语言:javascript
运行
复制
import json
import csv
import logging
import os
import traceback
from datetime import datetime

from google.api_core import retry
from google.cloud import bigquery
from google.cloud import storage
import pytz



PROJECT_ID = os.getenv('GCP_PROJECT')
BQ_DATASET = 'fromCloudFunction'
BQ_TABLE = 'mytable'

CS = storage.Client()
BQ = bigquery.Client()


def streaming(data, context):
    '''This function is executed whenever a file is added to Cloud Storage'''
    bucket_name = data['bucket']
    file_name = data['name']

    newRows = postProcessing(bucket_name, file_name)

    # It is recommended that you save 
    # what you process for debugging reasons.
    destination_bucket = 'post-processed' # gs://post-processed/
    destination_name = file_name
    # saveRowsToBucket(newRows,destination_bucket,destination_name)
    rowsInsertIntoBigquery(newRows)



class BigQueryError(Exception):
    '''Exception raised whenever a BigQuery error happened''' 

    def __init__(self, errors):
        super().__init__(self._format(errors))
        self.errors = errors

    def _format(self, errors):
        err = []
        for error in errors:
            err.extend(error['errors'])
        return json.dumps(err)

def postProcessing(bucket_name, file_name):
    blob = CS.get_bucket(bucket_name).blob(file_name)
    my_str = blob.download_as_string().decode('utf-8')
    csv_reader = csv.DictReader(my_str.split('\n'))                                                                   
    newRows = []
    for row in csv_reader:
        modified_row = row # Add your logic
        newRows.append(modified_row)
    return newRows

def rowsInsertIntoBigquery(rows):
    table = BQ.dataset(BQ_DATASET).table(BQ_TABLE)
    errors = BQ.insert_rows_json(table,rows)
    if errors != []:
        raise BigQueryError(errors)

如果需要的话,仍然需要定义映射(行->newRow)和函数saveRowsToBucket

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59637058

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档