
本文详细介绍了如何在Apache Airflow中利用PythonSensor实现复杂的日期条件判断,从而精确控制DAG的任务执行流程。通过一个“每月最后一个周二”的实际案例,文章演示了如何编写自定义Python函数来检查特定日期条件,并将其集成到PythonSensor中,以决定是否继续执行下游任务,从而实现灵活且健壮的自动化工作流。
在自动化工作流管理平台Apache Airf
low中,经常需要根据特定条件来决定是否执行一系列任务。这些条件可能包括外部文件是否存在、数据库状态、API响应,或者像本文将探讨的——基于日期的复杂逻辑。当我们需要在DAG运行前进行条件判断,并且如果条件不满足就停止后续任务时,Airflow的传感器(Sensor)机制提供了一个优雅且强大的解决方案。
Airflow提供了多种实现条件逻辑的Operator,例如BranchPythonOperator用于根据条件选择不同的分支路径,或者ShortCircuitOperator用于在条件不满足时跳过下游任务。然而,当我们需要在任务开始前“等待”某个条件满足,或者当条件不满足时直接“阻止”下游任务执行时,传感器(Sensor)是更合适的选择。
传感器是一种特殊的Operator,它会周期性地检查某个条件,直到条件满足才会标记为成功并触发下游任务。如果条件在设定的超时时间内仍未满足,传感器可以选择失败或跳过(soft_fail=True)下游任务。对于“如果条件不满足就停止所有任务”的需求,PythonSensor结合适当的配置可以完美实现。
本教程将以一个具体的场景为例:一个DAG只有在“每月最后一个周二”才需要运行其核心业务逻辑。如果不是,则不执行任何后续任务。
首先,我们需要一个Python函数来判断给定的日期是否是当月的最后一个周二。这个函数将作为PythonSensor的python_callable参数。
from datetime import datetime, timedelta
import calendar
def is_last_tuesday_of_month(**kwargs):
"""
检查Airflow的执行日期是否是当月的最后一个周二。
如果满足条件,返回True;否则返回False。
"""
# 从Airflow上下文中获取执行日期
# 'ds' 格式为 'YYYY-MM-DD'
ds = kwargs.get('ds')
if not ds:
# 如果在Airflow上下文之外测试,可以使用当前日期
print("警告: 'ds' 未在kwargs中找到。使用当前日期进行检查。")
execution_date = datetime.now().date()
else:
execution_date = datetime.strptime(ds, '%Y-%m-%d').date()
year = execution_date.year
month = execution_date.month
# 获取当前月的最后一天
_, num_days = calendar.monthrange(year, month)
last_day_of_month = datetime(year, month, num_days).date()
# 从当月最后一天开始向前迭代,查找最后一个周二
current_check_date = last_day_of_month
while current_check_date.month == month:
if current_check_date.weekday() == calendar.TUESDAY:
# 找到了当月的最后一个周二
is_match = (current_check_date == execution_date)
print(f"执行日期: {execution_date}, 当月最后一个周二: {current_check_date}. 匹配结果: {is_match}")
return is_match
current_check_date -= timedelta(days=1)
# 对于有效月份,通常不会执行到这里
print(f"在 {year} 年 {month} 月中未找到周二。")
return False
函数说明:
接下来,我们将上述函数集成到Airflow DAG中,使用PythonSensor。
挖错网
一款支持文本、图片、视频纠错和AIGC检测的内容审核校对平台。
185
查看详情
from airflow import DAG
from airflow.sensors.python import PythonSensor
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta
# 导入上面定义的日期检查函数
# from your_module import is_last_tuesday_of_month
# 或者直接将函数定义在DAG文件顶部
with DAG(
dag_id='conditional_last_tuesday_dag',
start_date=datetime(2025, 1, 1),
schedule_interval='@daily', # 每天运行,传感器会进行判断
catchup=False,
tags=['sensor', 'condition', 'date'],
) as dag:
# 传感器任务:检查是否为当月最后一个周二
check_last_tuesday = PythonSensor(
task_id='wait_for_last_tuesday',
python_callable=is_last_tuesday_of_month,
poke_interval=60 * 60 * 24, # 每天检查一次 (24小时)
timeout=60 * 60 * 24 * 31, # 最大等待时间,约一个月,防止无限等待
mode='poke', # 默认模式,周期性调用callable
soft_fail=True, # 如果超时,将任务标记为skipped而不是failed
# op_kwargs={'some_arg': 'value'} # 如果你的callable需要额外的固定参数,可以通过这里传递
)
# 核心业务任务,只有当传感器成功时才运行
T1 = DummyOperator(task_id='delete_gcs_files')
T2 = DummyOperator(task_id='run_sql_query_1')
T3 = DummyOperator(task_id='run_sql_query_2')
T4 = DummyOperator(task_id='run_sql_query_3_to_gcs')
T5 = DummyOperator(task_id='copy_ref_to_history_table')
# 定义任务依赖关系
# 传感器成功后,才执行T1及后续任务
check_last_tuesday >> T1
T1 >> T2
T2 >> T3
T3 >> T4
T4 >> T5
代码说明:
传感器超时与重试:
python_callable的幂等性:
日志记录:
替代方案(适用于不同场景):
资源消耗:
通过PythonSensor,Airflow为复杂的条件性工作流提供了强大的支持。结合自定义的Python逻辑,我们可以灵活地在DAG执行前进行各种检查,确保任务只在符合预设条件时才运行。这种机制极大地增强了Airflow DAG的鲁棒性和适应性,使其能够处理更加复杂的业务逻辑和调度需求。理解并熟练运用传感器,是构建高效、可靠Airflow工作流的关键一步。
以上就是在Apache Airflow中实现基于日期的条件性DAG执行的详细内容,更多请关注其它相关文章!
# 时间内
# 寻找福州seo渠道公司
# 网站如何建设客户
# 拼多多关键词优化排名
# 邵阳高端网站建设
# 太原网站推广费用多少钱一个月
# 山西手动网络营销推广
# 随州整合营销推广
# 秀洲区网站seo
# 丰都省心全网营销推广
# 网站url层级优化是怎么优化的
# 当我们
# 自定义
# python
# 设置为
# 这是
# 一个月
# 工作流
# 不满足
# 跳过
# 当月
# yy
# python函数
# ai
# apache
相关文章:
c++如何使用std::memory_order控制原子操作顺序_c++ C++11内存模型详解
PySpark中高效提取字符串右侧可变长度数字:使用regexp_extract
Fabric Mod开发:在1.19.3+版本中正确添加自定义物品并管理物品组
J*aScript中高效清空DOM列表元素:解决for循环中断与任务管理问题
QQ邮箱网页版入口页面 QQ邮箱在线登录入口官网
理解J*aScript Promise的微任务队列与执行顺序
win11 Snap Layouts怎么用 Win11窗口布局与分屏多任务高效指南【必学】
LINUX的perf命令入门_LINUX官方性能分析工具的使用与解读
妖精动漫免费平台 妖精动漫官网资源观看网址
word中如何让数字纵向排列_Word数字纵向排列方法
Bilibili动漫最新防封地址发布-Bilibili动漫2025年最稳正版入口推荐
使用PHP DOM解析器高效提取HTML中特定标题及其紧邻段落
Fabric模组开发:自定义物品与物品组的现代管理方法
小红书网页版入口链接分享 小红书官网直接进
不同用户不同价格! 索尼开启账户个性化定价测试
在哪找SublimeJ远程工具_SFTP插件配置教程
4399体育竞技小游戏_4399小游戏赛事入口
韩剧圈正版入口页面_韩剧圈官网登录链接
使用PHP从URL路径中提取倒数第二个片段
不会效仿卡普空!《铁拳》制作人澄清:不采取赛事付费|直播|
Spring Boot嵌入式服务器与J*a EE:功能支持深度解析
如何在J*a中实现统一对象行为接口_项目大型化时的接口规范化
深入理解Go语言中Map值与方法接收器的交互:为什么需要临时变量
钉钉视频会议画面卡顿如何解决 钉钉会议画面优化方法
汽水音乐网页版使用入口_汽水音乐电脑版播放指南
UC浏览器网页版登录入口官网 电脑版网址入口
J*aScript:在map操作中高效处理空数组
初次安装JDK时环境变量如何正确配置_J*A_HOME与PATH设置规则讲解
win11如何卸载Windows更新补丁 Win11解决更新导致系统不稳定的问题【修复】
C++如何操作注册表_Windows平台下C++读写注册表的API函数详解
Excel文件在线转换快速入口 Excel在线格式转换网站
谷歌google账号注册详细步骤 谷歌账号注册官方教程
TikTok网页版直接登录 TikTok网页端官方平台入口
抖音网页版怎么|直播|_抖音网页版开播操作指南
葱吃多了会怎样 葱吃多了会伤胃吗
汽水音乐在线版入口_汽水音乐网页播放手册
解决Rails应用中内容错位与Turbo警告:meta标签误用导致富文本渲染异常
Word2013如何插入视频和音频媒体_Word2013媒体插入的多媒体支持
星露谷物语官网入口 星露谷物语游戏官网入口
如何高效处理PHP中的Excel数据导入导出?PortPHP/Spreadsheet助你轻松搞定!
J*a递归快速排序中静态变量导致数据累积问题的解决方案
谷歌浏览器浏览体验优化_谷歌浏览器新版直连永久可用提示
Win10双系统截图高效法 截屏快捷键速记【技巧】
顺丰快件物流信息 官方网站查询入口
C++ map遍历方法大全_C++ map迭代器使用总结
钉钉视频会议声音异常如何处理 钉钉会议音频修复技巧
漫蛙MANWA漫画主页官方入口 漫蛙漫画最新在线阅读地址
Angular响应式表单:实现提交后表单及按钮的禁用与只读化
腾讯QQ邮箱登录入口_QQ邮箱官方网站使用地址
C++如何比较两个字符串_C++ string compare函数与操作符对比
*请认真填写需求信息,我们会在24小时内与您取得联系。