首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >BigQueryExecuteQueryOperator超时问题(如何增加超时)

BigQueryExecuteQueryOperator超时问题(如何增加超时)
EN

Stack Overflow用户
提问于 2021-12-06 11:10:14
回答 1查看 812关注 0票数 1

面对气流中的问题。简而言之:我试图从1.10.14版本移到2.2.2ver。然而,当我想要将BigQueryOperator替换为BigQueryExecuteQueryOperator时,我试图增加Bigquery文档是如何被告知的工作时间,但仍然看到了这个问题。

当作业运行超过1分钟时,我观察到了这个问题,同时,在旧的Bigquery操作符中,在气流1.10.14上配置相同配置的google云连接中,我还没有看到过这种问题。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    mobile_push_stat = BigQueryExecuteQueryOperator(
        task_id="mobile_push_stat",
        sql="/sql/updater/mobile_push_stat.sql",
        use_legacy_sql=False,
        api_resource_configs={"jobTimeoutMs": "3600000"},
        gcp_conn_id="bigquery_work",
    )

原木

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
*** Reading local file: /srv/airflow/logs/ExtensionPushStat/mobile_push_stat/2021-12-05T06:00:00+00:00/29.log
[2021-12-06, 17:33:11 UTC] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: ExtensionPushStat.mobile_push_stat scheduled__2021-12-05T06:00:00+00:00 [queued]>
[2021-12-06, 17:33:11 UTC] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: ExtensionPushStat.mobile_push_stat scheduled__2021-12-05T06:00:00+00:00 [queued]>
[2021-12-06, 17:33:11 UTC] {taskinstance.py:1241} INFO - 
--------------------------------------------------------------------------------
[2021-12-06, 17:33:11 UTC] {taskinstance.py:1242} INFO - Starting attempt 29 of 29
[2021-12-06, 17:33:11 UTC] {taskinstance.py:1243} INFO - 
--------------------------------------------------------------------------------
[2021-12-06, 17:33:11 UTC] {taskinstance.py:1262} INFO - Executing <Task(BigQueryExecuteQueryOperator): mobile_push_stat> on 2021-12-05 06:00:00+00:00
[2021-12-06, 17:33:11 UTC] {base_task_runner.py:141} INFO - Running on host: airflow-vm-v2
[2021-12-06, 17:33:11 UTC] {base_task_runner.py:142} INFO - Running: ['airflow', 'tasks', 'run', 'ExtensionPushStat', 'mobile_push_stat', 'scheduled__2021-12-05T06:00:00+00:00', '--job-id', '3235', '--raw', '--subdir', 'DAGS_FOLDER/ExtensionPushStat.py', '--cfg-path', '/tmp/tmpk6p4qql0', '--error-file', '/tmp/tmpwz3z5o0y']
[2021-12-06, 17:33:12 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat [[34m2021-12-06, 17:33:12 UTC[0m] {[34mdagbag.py:[0m500} INFO[0m - Filling up the DagBag from /srv/airflow/dags/ExtensionPushStat.py[0m
[2021-12-06, 17:34:11 UTC] {local_task_job.py:206} WARNING - Recorded pid 137796 does not match the current pid 137817
[2021-12-06, 17:34:11 UTC] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 137817
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat Running <TaskInstance: ExtensionPushStat.mobile_push_stat scheduled__2021-12-05T06:00:00+00:00 [running]> on host airflow-vm-v2
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat Traceback (most recent call last):
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/bin/airflow", line 8, in <module>
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     sys.exit(main())
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/__main__.py", line 48, in main
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     args.func(args)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     return func(*args, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/utils/cli.py", line 92, in wrapper
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     return f(*args, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     _run_task_by_selected_method(args, dag, ti)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     _run_raw_task(args, ti)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     ti._run_raw_task(
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     return func(*args, session=session, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     self._execute_task_with_callbacks(context)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     result = self._execute_task(context, self.task)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     result = execute_callable(context=context)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/bigquery.py", line 693, in execute
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     job_id = self.hook.run_query(
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 2325, in run_query
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     job = self.insert_job(configuration=configuration, project_id=self.project_id)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 425, in inner_wrapper
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     return func(self, *args, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 1639, in insert_job
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     job.result()
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/cloud/bigquery/job/query.py", line 1450, in result
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     do_get_result()
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/cloud/bigquery/job/query.py", line 1440, in do_get_result
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     super(QueryJob, self).result(retry=retry, timeout=timeout)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/cloud/bigquery/job/base.py", line 727, in result
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/api_core/future/polling.py", line 130, in result
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     self._blocking_poll(timeout=timeout, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/cloud/bigquery/job/query.py", line 1199, in _blocking_poll
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     super(QueryJob, self)._blocking_poll(timeout=timeout, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/api_core/future/polling.py", line 108, in _blocking_poll
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     retry_(self._done_or_raise)(**kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_func
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     return retry_target(
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/api_core/retry.py", line 220, in retry_target
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     time.sleep(sleep)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1413, in signal_handler
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     raise AirflowException("Task received SIGTERM signal")
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat airflow.exceptions.AirflowException: Task received SIGTERM signal
[2021-12-06, 17:34:11 UTC] {process_utils.py:66} INFO - Process psutil.Process(pid=137817, status='terminated', exitcode=1, started='17:33:11') (137817) terminated with exit code 1

更新

这是与Bigquery或Google提供商无关的问题。

此问题涉及两个配置参数。

killed_task_cleanup_time -在本例中,对于长期运行的作业,需要增加job_heartbeat_sec在本例中,当您希望清除失败任务的状态并从UI重新运行时,需要增加job_heartbeat_sec。

EN

回答 1

Stack Overflow用户

发布于 2021-12-06 11:21:51

不建议使用BigQueryExecuteQueryOperator (参见源代码)。您应该使用BigQueryInsertJobOperator

根据api接口参数和操作符源代码,语法应该是:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
mobile_push_stat = BigQueryInsertJobOperator(
    task_id="mobile_push_stat",
    gcp_conn_id="bigquery_work",
    configuration={
        "query": "/sql/updater/mobile_push_stat.sql",
        "useLegacySql": False,
        "timeoutMs": 3600000,

    },
)

编辑:注意到超时值的局限性。您选择的3600000值可能太高了。

您可以在timeoutMs字段中请求更长的超时时间。但是,调用不能保证等待指定的超时;它通常在大约200秒(200,000毫秒)后返回,即使查询未完成。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70250656

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文