Google App Engine (GAE)是Google提供的PaaS平台服务,允许开发者构建和托管Web应用程序。BigQuery是Google提供的企业级数据仓库解决方案。当在GAE中调用BigQuery API时,可能会遇到"超过截止日期"(Deadline Exceeded)错误。
from google.cloud import bigquery
client = bigquery.Client()
# 异步查询示例
query_job = client.query(
"""
SELECT name, COUNT(*) as name_count
FROM `bigquery-public-data.usa_names.usa_1910_current`
GROUP BY name
ORDER BY name_count DESC
LIMIT 10
"""
) # API请求默认是异步的
# 等待查询完成,可以设置超时时间
results = query_job.result(timeout=300) # 设置5分钟超时
GAE灵活环境有更长的请求超时限制(60分钟),适合长时间运行的BigQuery操作。
# 分页获取结果示例
query_job = client.query("SELECT * FROM large_table")
iterator = query_job.result(timeout=30)
rows = list(iterator) # 或者逐行处理
from google.cloud import bigquery
from google.api_core import client_options
# 自定义超时设置
client_options = client_options.ClientOptions(
api_endpoint="https://bigquery.googleapis.com",
timeout=300 # 5分钟超时
)
client = bigquery.Client(client_options=client_options)
将长时间运行的BigQuery查询放入GAE任务队列中异步处理:
from google.cloud import tasks_v2
import json
def create_task(project, queue, location, payload):
client = tasks_v2.CloudTasksClient()
parent = client.queue_path(project, location, queue)
task = {
'http_request': {
'http_method': tasks_v2.HttpMethod.POST,
'url': '/process_bigquery',
'headers': {
'Content-Type': 'application/json'
},
'body': json.dumps(payload).encode()
}
}
response = client.create_task(parent=parent, task=task)
return response
这种解决方案适用于:
通过以上方法,可以有效解决GAE调用BigQuery API时的超时问题,确保应用程序稳定运行。