Airflow variables getting updated even if the DAG is not running(即使DAG未运行,气流变量也会更新)
本文介绍了即使DAG未运行,气流变量也会更新的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我从气流变量中读取一个整数变量,然后在每次DAG运行时将该值加1,并再次将其设置为该变量。
但在下面的代码之后,每次刷新页面时,UI处的变量都会更改。 了解导致此类行为的原因
counter = Variable.get('counter')
s = BashOperator(
task_id='echo_start_variable',
bash_command='echo ' + counter,
dag=dag,
)
Variable.set("counter", int(counter) + 1)
sql_query = "SELECT * FROM UNNEST(SEQUENCE({start}, {end}))"
sql_query = sql_query.replace('{start}', start).replace('{end}', end)
submit_query = PythonOperator(
task_id='submit_athena_query',
python_callable=run_athena_query,
op_kwargs={'query': sql_query, 'db': 'db',
's3_output': 's3://s3-path/rohan/date=' + current_date + '/'},
dag=dag)
e = BashOperator(
task_id='echo_end_variable',
bash_command='echo ' + counter,
dag=dag,
)
s >> submit_query >> e
Airflow每30秒处理一次推荐答案文件(默认设置为)这意味着您所有顶级代码都是每30秒运行一次,因此
将导致变量计数器每30秒递增1。
在顶级代码中与变量交互是一种糟糕的做法(不管值的增加问题如何)。它每隔30秒打开一个到Metore数据库的连接,这可能会导致严重问题并使数据库不堪重负。
要获取变量的值,可以使用JJJA:
e = BashOperator(
task_id='echo_end_variable',
bash_command='echo {{ var.value.counter }}',
dag=dag,
)
这是使用变量的一种安全方式,因为只有在执行运算符时才会检索值。
如果要将变量的值增加1,则使用PythonOpeartor
:
def increase():
counter = Variable.get('counter')
Variable.set("counter", int(counter) + 1)
increase_op = PythonOperator(
task_id='increase_task',
python_callable=increase,
dag=dag)
只有在运算符运行时,才会执行可调用的python。
这篇关于即使DAG未运行,气流变量也会更新的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
沃梦达教程
本文标题为:即使DAG未运行,气流变量也会更新
猜你喜欢
- 我如何透明地重定向一个Python导入? 2022-01-01
- CTR 中的 AES 如何用于 Python 和 PyCrypto? 2022-01-01
- 如何使用PYSPARK从Spark获得批次行 2022-01-01
- ";find_element_by_name(';name';)";和&QOOT;FIND_ELEMENT(BY NAME,';NAME';)";之间有什么区别? 2022-01-01
- 检查具有纬度和经度的地理点是否在 shapefile 中 2022-01-01
- 计算测试数量的Python单元测试 2022-01-01
- 使用公司代理使Python3.x Slack(松弛客户端) 2022-01-01
- 使用 Cython 将 Python 链接到共享库 2022-01-01
- 我如何卸载 PyTorch? 2022-01-01
- YouTube API v3 返回截断的观看记录 2022-01-01