centos7
Airflow 2.0.2
Python 3.8.3
Mysql 5.7.29
redis 5.0.8
略(自行百度)
vim ~/.bashrc
# 添加一行环境变量
export AIRFLOW_HOME=/opt/airflow
source ~/.bashrc
export SLUGIFY_USES_TEXT_UNIDECODE=yes
# 可能会有一些报错请忽略,如果生成了配置文件,保证AIRFLOW_HOME目录下生成了.cfg及相关文件即证明本次执行成功
# 如果配置了pytho的环境变量直接执行`airflow`命令
# 没配置在${PYTHON_HOME}/lib/python3.6/sit-packages/airflow/bin目录下执行`./airflow`
pip install apache-airflow
pip install 'apache-airflow[mysql]'
pip install 'apache-airflow[celery]'
pip install 'apache-airflow[redis]'
pip install pymysql
# sqlalchemy链接
sql_alchemy_conn = mysql+pymysql://root:root@10.1.49.71:3306/airflow?charset=utf8
# 配置执行器
executor=CeleryExecutor
# 配置celery的broker_url
broker_url = redis://lochost:5379/0
# 配置元数据信息管理
result_backend = db+mysql://username:password@localhost:3306/airflow
# 创建用户组和用户
groupadd airflow
useradd airflow -g airflow
# 将 {AIRFLOW_HOME}目录修用户组
cd /opt/
chgrp -R airflow airflow
初始化前请先创建airflow
数据库以免报错
airflow db init
# 用于登录airflow
airflow create_user --lastname user --firstname admin --username admin --email admin_user@mail.com --role Admin --password admin
# 前台启动web服务
airflow webserver
# 后台启动web服务
airflow webserver -D
# 前台启动scheduler
airflow schedule
# 后台启动scheduler
airflow scheduler -D
# worker主机只需用普通用户打开airflow worker
# 创建用户airflow
useradd airflow
# 对用户test设置密码
passwd airflow
# 在root用户下,改变airflow文件夹的权限,设为全开放
chmod -R 777 /opt/airflow
# 切换为普通用户,执行airflow worker命令就行
# 启动时发现普通用户读取的~/.bashrc文件 不一致 重新加入AIRFLOW_HOME 就可以了
# 如果在新建普通用户前配置好环境变量可能没有这个问题了 本人是在创建用户后修改了环境变量
# 使用celery执行worker
airflow celery worker
# 执行worker之前运行临时变量(临时的不能永久使用)
export C_FORCE_ROOT="true"
# 不需要切换用户
cd /usr/local/python3/bin/
# 前台启动worker服务
airflow celery worker
# 后台启动work服务
airflow celery worker -D
default_timezone = Asia/Shanghai
default_args = {
# 接受邮箱
'email': ['demo@qq.com''],
# task失败是否发送邮件
'email_on_failure': True,
# task重试是否发送邮件
'email_on_retry': False,
}
——————————————————————————————————————————————
airflow的全局变量中设置
在DAG中加入参数用于控制整个dag
dag = DAG(f"dag_name",
default_args=default_args,
schedule_interval="0 12 * * *",
max_active_runs = 1
)
在每个task中的Operator中设置参数
t3 = PythonOperator(
task_id='demo_task',
provide_context=True,
python_callable=demo_task,
task_concurrency=1,
dag=dag)
在使用airflow scheduler -D
命令时发现无法启动会报错
报错如下:
Traceback (most recent call last):
File "/opt/anaconda3/bin/airflow", line 37, in <module>
args.func(args)
File "/opt/anaconda3/lib/python3.8/site-packages/airflow/utils/cli.py", line 76, in wrapper
return f(*args, **kwargs)
File "/opt/anaconda3/lib/python3.8/site-packages/airflow/bin/cli.py", line 1213, in scheduler
job.run()
File "/opt/anaconda3/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 212, in run
session.commit()
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1036, in commit
self.transaction.commit()
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 503, in commit
self._prepare_impl()
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 482, in _prepare_impl
self.session.flush()
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2496, in flush
self._flush(objects)
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2637, in _flush
transaction.rollback(_capture_exception=True)
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
compat.raise_(
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
raise exception
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2597, in _flush
flush_context.execute()
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
rec.execute(self)
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 586, in execute
persistence.save_obj(
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 205, in save_obj
for (
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 373, in _organize_states_for_save
for state, dict_, mapper, connection in _connections_for_states(
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 1602, in _connections_for_states
connection = uowtransaction.transaction.connection(base_mapper)
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 313, in connection
return self._connection_for_bind(bind, execution_options)
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 420, in _connection_for_bind
conn = self._parent._connection_for_bind(bind, execution_options)
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 432, in _connection_for_bind
conn = bind._contextual_connect()
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2251, in _contextual_connect
self._wrap_pool_connect(self.pool.connect, None),
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2285, in _wrap_pool_connect
return fn()
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 363, in connect
return _ConnectionFairy._checkout(self)
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 804, in _checkout
result = pool._dialect.do_ping(fairy.connection)
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/mysqldb.py", line 138, in do_ping
dbapi_connection.ping(False)
File "/opt/anaconda3/lib/python3.8/site-packages/pymysql/connections.py", line 534, in ping
self._execute_command(COMMAND.COM_PING, "")
File "/opt/anaconda3/lib/python3.8/site-packages/pymysql/connections.py", line 763, in _execute_command
self._write_bytes(packet)
File "/opt/anaconda3/lib/python3.8/site-packages/pymysql/connections.py", line 703, in _write_bytes
self._sock.settimeout(self._write_timeout)
OSError: [Errno 9] Bad file descriptor
Exception ignored in: <function _ConnectionRecord.checkout.<locals>.<lambda> at 0x7f99dae28940>
Traceback (most recent call last):
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 503, in <lambda>
File "/opt/anaconda3/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 701, in _finalize_fairy
File "/opt/anaconda3/lib/python3.8/logging/__init__.py", line 1463, in error
File "/opt/anaconda3/lib/python3.8/logging/__init__.py", line 1577, in _log
File "/opt/anaconda3/lib/python3.8/logging/__init__.py", line 1587, in handle
File "/opt/anaconda3/lib/python3.8/logging/__init__.py", line 1649, in callHandlers
File "/opt/anaconda3/lib/python3.8/logging/__init__.py", line 950, in handle
File "/opt/anaconda3/lib/python3.8/logging/__init__.py", line 1182, in emit
File "/opt/anaconda3/lib/python3.8/logging/__init__.py", line 1172, in _open
NameError: name 'open' is not defined
# 这个问题,它似乎源于“池”行为。可以通过禁用连接池来绕过它: sql alchemy pool enabled = False
sql_alchemy_pool_enabled = False
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。