面对气流中的问题。简而言之:我试图从1.10.14版本移到2.2.2ver。然而,当我想要将BigQueryOperator替换为BigQueryExecuteQueryOperator时,我试图增加Bigquery文档是如何被告知的工作时间,但仍然看到了这个问题。
当作业运行超过1分钟时,我观察到了这个问题,同时,在旧的Bigquery操作符中,在气流1.10.14上配置相同配置的google云连接中,我还没有看到过这种问题。
mobile_push_stat = BigQueryExecuteQueryOperator(
task_id="mobile_push_stat",
sql="/sql/updater/mobile_push_stat.sql",
use_legacy_sql=False,
api_resource_configs={"jobTimeoutMs": "3600000"},
gcp_conn_id="bigquery_work",
)
原木
*** Reading local file: /srv/airflow/logs/ExtensionPushStat/mobile_push_stat/2021-12-05T06:00:00+00:00/29.log
[2021-12-06, 17:33:11 UTC] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: ExtensionPushStat.mobile_push_stat scheduled__2021-12-05T06:00:00+00:00 [queued]>
[2021-12-06, 17:33:11 UTC] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: ExtensionPushStat.mobile_push_stat scheduled__2021-12-05T06:00:00+00:00 [queued]>
[2021-12-06, 17:33:11 UTC] {taskinstance.py:1241} INFO -
--------------------------------------------------------------------------------
[2021-12-06, 17:33:11 UTC] {taskinstance.py:1242} INFO - Starting attempt 29 of 29
[2021-12-06, 17:33:11 UTC] {taskinstance.py:1243} INFO -
--------------------------------------------------------------------------------
[2021-12-06, 17:33:11 UTC] {taskinstance.py:1262} INFO - Executing <Task(BigQueryExecuteQueryOperator): mobile_push_stat> on 2021-12-05 06:00:00+00:00
[2021-12-06, 17:33:11 UTC] {base_task_runner.py:141} INFO - Running on host: airflow-vm-v2
[2021-12-06, 17:33:11 UTC] {base_task_runner.py:142} INFO - Running: ['airflow', 'tasks', 'run', 'ExtensionPushStat', 'mobile_push_stat', 'scheduled__2021-12-05T06:00:00+00:00', '--job-id', '3235', '--raw', '--subdir', 'DAGS_FOLDER/ExtensionPushStat.py', '--cfg-path', '/tmp/tmpk6p4qql0', '--error-file', '/tmp/tmpwz3z5o0y']
[2021-12-06, 17:33:12 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat [[34m2021-12-06, 17:33:12 UTC[0m] {[34mdagbag.py:[0m500} INFO[0m - Filling up the DagBag from /srv/airflow/dags/ExtensionPushStat.py[0m
[2021-12-06, 17:34:11 UTC] {local_task_job.py:206} WARNING - Recorded pid 137796 does not match the current pid 137817
[2021-12-06, 17:34:11 UTC] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 137817
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat Running <TaskInstance: ExtensionPushStat.mobile_push_stat scheduled__2021-12-05T06:00:00+00:00 [running]> on host airflow-vm-v2
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat Traceback (most recent call last):
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/bin/airflow", line 8, in <module>
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat sys.exit(main())
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/airflow/__main__.py", line 48, in main
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat args.func(args)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat return func(*args, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/airflow/utils/cli.py", line 92, in wrapper
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat return f(*args, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat _run_task_by_selected_method(args, dag, ti)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat _run_raw_task(args, ti)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat ti._run_raw_task(
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat return func(*args, session=session, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat self._execute_task_with_callbacks(context)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat result = self._execute_task(context, self.task)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat result = execute_callable(context=context)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/bigquery.py", line 693, in execute
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat job_id = self.hook.run_query(
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 2325, in run_query
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat job = self.insert_job(configuration=configuration, project_id=self.project_id)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 425, in inner_wrapper
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat return func(self, *args, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 1639, in insert_job
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat job.result()
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/google/cloud/bigquery/job/query.py", line 1450, in result
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat do_get_result()
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/google/cloud/bigquery/job/query.py", line 1440, in do_get_result
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat super(QueryJob, self).result(retry=retry, timeout=timeout)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/google/cloud/bigquery/job/base.py", line 727, in result
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/google/api_core/future/polling.py", line 130, in result
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat self._blocking_poll(timeout=timeout, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/google/cloud/bigquery/job/query.py", line 1199, in _blocking_poll
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat super(QueryJob, self)._blocking_poll(timeout=timeout, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/google/api_core/future/polling.py", line 108, in _blocking_poll
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat retry_(self._done_or_raise)(**kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_func
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat return retry_target(
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/google/api_core/retry.py", line 220, in retry_target
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat time.sleep(sleep)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat File "/srv/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1413, in signal_handler
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat raise AirflowException("Task received SIGTERM signal")
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat airflow.exceptions.AirflowException: Task received SIGTERM signal
[2021-12-06, 17:34:11 UTC] {process_utils.py:66} INFO - Process psutil.Process(pid=137817, status='terminated', exitcode=1, started='17:33:11') (137817) terminated with exit code 1
更新
这是与Bigquery或Google提供商无关的问题。
此问题涉及两个配置参数。
killed_task_cleanup_time -在本例中,对于长期运行的作业,需要增加job_heartbeat_sec在本例中,当您希望清除失败任务的状态并从UI重新运行时,需要增加job_heartbeat_sec。
发布于 2021-12-06 11:21:51
不建议使用BigQueryExecuteQueryOperator
(参见源代码)。您应该使用BigQueryInsertJobOperator
。
mobile_push_stat = BigQueryInsertJobOperator(
task_id="mobile_push_stat",
gcp_conn_id="bigquery_work",
configuration={
"query": "/sql/updater/mobile_push_stat.sql",
"useLegacySql": False,
"timeoutMs": 3600000,
},
)
编辑:注意到超时值的局限性。您选择的3600000
值可能太高了。
您可以在timeoutMs字段中请求更长的超时时间。但是,调用不能保证等待指定的超时;它通常在大约200秒(200,000毫秒)后返回,即使查询未完成。
https://stackoverflow.com/questions/70250656
复制相似问题