大数据调度平台Airflow(五):Airflow使用

举报
Lansonli 发表于 2022/03/26 00:25:39 2022/03/26
【摘要】 目录 Airflow使用 一、Airflow调度Shell命令 1.首先我们需要创建一个python文件,导入需要的类库 2.实例化DAG 3、定义Task 4、设置task依赖关系 5、上传python配置脚本 6、重启Airflow 7、执行airflow 二、DAG调度触发时间 三、DAG catchup 参...

目录

Airflow使用

一、Airflow调度Shell命令

1.首先我们需要创建一个python文件,导入需要的类库

2.实例化DAG

3、定义Task

4、设置task依赖关系

5、上传python配置脚本

6、重启Airflow

7、执行airflow

二、DAG调度触发时间

三、DAG catchup 参数设置

四、DAG调度周期设置

五、DAG任务依赖设置

1、​​​​​​​DAG任务依赖设置一

2、​​​​​​​DAG任务依赖设置二

3、​​​​​​​DAG任务依赖设置三

4、​​​​​​​DAG任务依赖设置四

5、​​​​​​​DAG任务依赖设置五


Airflow使用

上文说到使用Airflow进行任务调度大体步骤如下:

  • 创建python文件,根据实际需要,使用不同的Operator
  • 在python文件不同的Operator中传入具体参数,定义一系列task
  • 在python文件中定义Task之间的关系,形成DAG
  • 将python文件上传执行,调度DAG,每个task会形成一个Instance
  • 使用命令行或者WEBUI进行查看和管理

以上python文件就是Airflow python脚本,使用代码方式指定DAG的结构

一、Airflow调度Shell命令

下面我们以调度执行shell命令为例,来讲解Airflow使用。

1.首先我们需要创建一个python文件,导入需要的类库


  
  1. # 导入 DAG 对象,后面需要实例化DAG对象
  2. from airflow import DAG
  3. # 导入BashOperator Operators,我们需要利用这个对象去执行流程
  4. from airflow.operators.bash import BashOperator

注意:以上代码可以在开发工具中创建,但是需要在使用的python3.7环境中导入安装Airflow包。

D:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow==2.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple
 

2.实例化DAG


  
  1. from datetime import datetime, timedelta
  2. # default_args中定义一些参数,在实例化DAG时可以使用,使用python dic 格式定义
  3. default_args = {
  4. 'owner': 'airflow', # 拥有者名称
  5. 'start_date': datetime(2022, 3, 25), # 第一次开始执行的时间,为 UTC 时间
  6. 'retries': 1, # 失败重试次数
  7. 'retry_delay': timedelta(minutes=5), # 失败重试间隔
  8. }
  9. dag = DAG(
  10. dag_id = 'myairflow_execute_bash', #DAG id ,必须完全由字母、数字、下划线组成
  11. default_args = default_args, #外部定义的 dic 格式的参数
  12. schedule_interval = timedelta(days=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
  13. )

 

注意:

  • 实例化DAG有三种方式

第一种方式:


  
  1. with DAG("my_dag_name") as dag:
  2. op=XXOperator(task_id="task")

第二种方式(以上采用这种方式):


  
  1. my_dag = DAG("my_dag_name")
  2. op = XXOperator(task_id="task", dag=my_dag)

第三种方式:


  
  1. @dag(start_date=days_ago(2))
  2. def generate_dag():
  3. op = XXOperator(task_id="task")
  4. dag = generate_dag()
  • baseoperator基础参数说明:

可以参照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator查看baseopartor中更多参数。

  • DAG参数说明

可以参照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html

查看DAG参数说明,也可以直接在开发工具点击DAG进入源码看下对应参数有哪些。

 

3、定义Task

当实例化Operator时会生成Task任务,从一个Operator中实例化出来对象的过程被称为一个构造方法,每个构造方法中都有“task_id”充当任务的唯一标识符。

下面我们定义三个Operator,也就是三个Task,每个task_id 不能重复。


  
  1. # operator 支持多种类型, 这里使用 BashOperator
  2. first = BashOperator(
  3. task_id='first',
  4. bash_command='echo "run first task"',
  5. dag=dag
  6. )
  7. middle = BashOperator(
  8. task_id='middle',
  9. bash_command='echo "run middle task"',
  10. dag=dag
  11. )
  12. last = BashOperator(
  13. task_id='last',
  14. bash_command='echo "run last task"',
  15. dag=dag,
  16. retries=3
  17. )

注意:

 

4、设置task依赖关系


  
  1. #使用 set_upstream、set_downstream 设置依赖关系,不能出现环形链路,否则报错
  2. # middle.set_upstream(first) # middle会在first执行完成之后执行
  3. # last.set_upstream(middle) # last 会在 middle执行完成之后执行
  4. #也可以使用位移符来设置依赖关系
  5. first >> middle >>last # first 首先执行,middle次之,last最后
  6. # first >> [middle,last] # first首先执行,middle ,last并行执行

 

注意:当执行脚本时,如果在DAG中找到一条环形链路(例如:A->B->C-A)会引发异常。更多DAG task依赖关系可参照官网:http://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#task-dependencies 

 

5、上传python配置脚本

到目前为止,python配置如下:


  
  1. # 导入 DAG 对象,后面需要实例化DAG对象
  2. from airflow import DAG
  3. # 导入BashOperator Operators,我们需要利用这个对象去执行流程
  4. from airflow.example_dags.example_bash_operator import dag
  5. from airflow.operators.bash import BashOperator
  6. from datetime import datetime, timedelta
  7. # default_args中定义一些参数,在实例化DAG时可以使用,使用python dic 格式定义
  8. default_args = {
  9. 'owner': 'airflow', # 拥有者名称
  10. 'start_date': datetime(2021, 9, 4), # 第一次开始执行的时间,为 UTC 时间
  11. 'retries': 1, # 失败重试次数
  12. 'retry_delay': timedelta(minutes=5), # 失败重试间隔
  13. }
  14. dag = DAG(
  15. dag_id = 'myairflow_execute_bash', #DAG id ,必须完全由字母、数字、下划线组成
  16. default_args = default_args, #外部定义的 dic 格式的参数
  17. schedule_interval = timedelta(days=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
  18. )
  19. # operator 支持多种类型, 这里使用 BashOperator
  20. first = BashOperator(
  21. task_id='first',
  22. bash_command='echo "run first task"',
  23. dag=dag
  24. )
  25. middle = BashOperator(
  26. task_id='middle',
  27. bash_command='echo "run middle task"',
  28. dag=dag
  29. )
  30. last = BashOperator(
  31. task_id='last',
  32. bash_command='echo "run last task"',
  33. dag=dag,
  34. retries=3
  35. )
  36. #使用 set_upstream、set_downstream 设置依赖关系,不能出现环形链路,否则报错
  37. # middle.set_upstream(first) # middle会在first执行完成之后执行
  38. # last.set_upstream(middle) # last 会在 middle执行完成之后执行
  39. #也可以使用位移符来设置依赖关系
  40. first >> middle >>last # first 首先执行,middle次之,last最后
  41. # first >> [middle,last] # first首先执行,middle ,last并行执行

 将以上python配置文件上传到$AIRFLOW_HOME/dags目录下,默认$AIRFLOW_HOME为安装节点的“/root/airflow”目录,当前目录下的dags目录需要手动创建。

6、重启Airflow

“ps aux|grep webserver”和“ps aux|grep scheduler”找到对应的airflow进程杀掉,重新启动Airflow。重启之后,可以在airflow webui看到对应的DAG ID ”myairflow_execute_bash”。

7、执行airflow

按照如下步骤执行DAG,首先打开工作流,然后“Trigger DAG”执行,随后可以看到任务执行成功。

查看task执行日志:

 

二、DAG调度触发时间

在Airflow中,调度程序会根据DAG文件中指定的“start_date”和“schedule_interval”来运行DAG。特别需要注意的是Airflow计划程序在计划时间段的末尾触发执行DAG,而不是在开始时刻触发DAG,例如:


  
  1. default_args = {
  2. 'owner': 'airflow', # 拥有者名称
  3. 'start_date': datetime(2022, 3, 25), # 第一次开始执行的时间,为 UTC 时间
  4. 'retries': 1, # 失败重试次数
  5. 'retry_delay': timedelta(minutes=5), # 失败重试间隔
  6. }
  7. dag = DAG(
  8. dag_id = 'myairflow_execute_bash', #DAG id ,必须完全由字母、数字、下划线组成
  9. default_args = default_args, #外部定义的 dic 格式的参数
  10. schedule_interval = timedelta(days=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
  11. )

以上配置的DAG是从世界标准时间2022年3月24号开始调度,每隔1天执行一次,这个DAG的具体运行时间如下图: 

自动调度DAG 执行日期

自动调度DAG实际执行触发时间

2022-03-24,00:00:00+00:00

2022-03-25,00:00:00+00:00

2022-03-25,00:00:00+00:00

2022-03-26,00:00:00+00:00

2022-03-26,00:00:00+00:00

2022-03-27,00:00:00+00:00

2022-03-27,00:00:00+00:00

2022-03-28,00:00:00+00:00

2022-03-28,00:00:00+00:00

2022-03-29,00:00:00+00:00

... ...

... ...

以上表格中以第一条数据为例解释,Airflow正常调度是每天00:00:00 ,假设当天日期为2022-03-24,正常我们认为只要时间到了2022-03-24 00:00:00 就会执行,改调度时间所处于的调度周期为2022-03-24 00:00:00 ~ 2022-03-25 00:00:00 ,在Airflow中实际上是在调度周期末端触发执行,也就是说2022-03-24 00:00:00 自动触发执行时刻为 2022-03-25 00:00:00。 

如下图,在airflow中,“execution_date”不是实际运行时间,而是其计划周期的开始时间戳。例如:execution_date 是2021-09-04 00:00:00 的DAG 自动调度运行的实际时间为2021-09-05 00:00:00。当然除了自动调度外,我们还可以手动触发执行DAG执行,要判断DAG运行时计划调度(自动调度)还是手动触发,可以查看“Run Type”。

 

三、DAG catchup 参数设置

 

在Airflow的工作计划中,一个重要的概念就是catchup(追赶),在实现DAG具体逻辑后,如果将catchup设置为True(默认就为True),Airflow将“回填”所有过去的DAG run,如果将catchup设置为False,Airflow将从最新的DAG run时刻前一时刻开始执行 DAG run,忽略之前所有的记录。

例如:现在某个DAG每隔1分钟执行一次,调度开始时间为2001-01-01 ,当前日期为2021-10-01 15:23:21,如果catchup设置为True,那么DAG将从2001-01-01 00:00:00 开始每分钟都会运行当前DAG。如果catchup 设置为False,那么DAG将从2021-10-01 15:22:20(当前2021-10-01 15:23:21前一时刻)开始执行DAG run。

举例:有first ,second,third三个shell命令任务,按照顺序调度,每隔1分钟执行一次,首次执行时间为2000-01-01。

设置catchup 为True(默认),DAG python配置如下:


  
  1. from airflow import DAG
  2. from airflow.operators.bash import BashOperator
  3. from datetime import datetime, timedelta
  4. default_args = {
  5. 'owner': 'airflow', # 拥有者名称
  6. 'start_date': datetime(2001, 1, 1), # 第一次开始执行的时间,为 UTC 时间
  7. 'retries': 1, # 失败重试次数
  8. 'retry_delay': timedelta(minutes=5), # 失败重试间隔
  9. }
  10. dag = DAG(
  11. dag_id = 'catchup_test1 ', #DAG id ,必须完全由字母、数字、下划线组成
  12. default_args = default_args, #外部定义的 dic 格式的参数
  13. schedule_interval = timedelta(minutes=1), # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
  14. catchup=True # 执行DAG时,将开始时间到目前所有该执行的任务都执行,默认为True
  15. )
  16. first = BashOperator(
  17. task_id='first',
  18. bash_command='echo "run first task"',
  19. dag=dag
  20. )
  21. middle = BashOperator(
  22. task_id='second',
  23. bash_command='echo "run second task"',
  24. dag=dag
  25. )
  26. last = BashOperator(
  27. task_id='third',
  28. bash_command='echo "run third task"',
  29. dag=dag,
  30. retries=3
  31. )
  32. first >> middle >>last

上传python配置文件到$AIRFLOW_HOME/dags下,重启airflow,DAG执行调度如下:

 

 

设置catchup 为False,DAG python配置如下:


  
  1. from airflow import DAG
  2. from airflow.operators.bash import BashOperator
  3. from datetime import datetime, timedelta
  4. default_args = {
  5. 'owner': 'airflow', # 拥有者名称
  6. 'start_date': datetime(2001, 1, 1), # 第一次开始执行的时间,为 UTC 时间
  7. 'retries': 1, # 失败重试次数
  8. 'retry_delay': timedelta(minutes=5), # 失败重试间隔
  9. }
  10. dag = DAG(
  11. dag_id = 'catchup_test2', #DAG id ,必须完全由字母、数字、下划线组成
  12. default_args = default_args, #外部定义的 dic 格式的参数
  13. schedule_interval = timedelta(minutes=1), # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
  14. catchup=False # 执行DAG时,将开始时间到目前所有该执行的任务都执行,默认为True
  15. )
  16. first = BashOperator(
  17. task_id='first',
  18. bash_command='echo "run first task"',
  19. dag=dag
  20. )
  21. middle = BashOperator(
  22. task_id='second',
  23. bash_command='echo "run second task"',
  24. dag=dag
  25. )
  26. last = BashOperator(
  27. task_id='third',
  28. bash_command='echo "run third task"',
  29. dag=dag,
  30. retries=3
  31. )
  32. first >> middle >>last

上传python配置文件到$AIRFLOW_HOME/dags下,重启airflow,DAG执行调度如下:

 

有两种方式在Airflow中配置catchup:

  • 全局配置

在airflow配置文件airflow.cfg的scheduler部分下,设置catchup_by_default=True(默认)或False,这个设置是全局性的设置。

  • DAG文件配置

在python代码配置中设置DAG对象的参数:dag.catchup=True或False。


  
  1. dag = DAG(
  2. dag_id = 'myairflow_execute_bash',
  3. default_args = default_args,
  4. catchup=False,
  5. schedule_interval = timedelta(days=1))

 

四、DAG调度周期设置

每个DAG可以有或者没有调度执行周期,如果有调度周期,我们可以在python代码DAG配置中设置“schedule_interval”参数来指定调度DAG周期,可以通过以下三种方式来设置。

  • 预置的Cron调度

Airflow预置了一些Cron调度周期,可以参照:

DAG Runs — Airflow Documentation,如下图:

 

在python配置文件中使用如下:


  
  1. default_args = {
  2. 'owner': 'airflow', # 拥有者名称
  3. 'start_date': datetime(2021, 9, 4), # 第一次开始执行的时间,为 UTC 时间
  4. 'retries': 1, # 失败重试次数
  5. 'retry_delay': timedelta(minutes=5), # 失败重试间隔
  6. }
  7. dag = DAG(
  8. dag_id = 'cron_test', #DAG id ,必须完全由字母、数字、下划线组成
  9. default_args = default_args, #外部定义的 dic 格式的参数
  10. schedule_interval = '@daily' # 使用预置的Cron调度,每天0点0分调度

 

  • Cron

这种方式就是写Linux系统的crontab定时任务命令,可以在https://crontab.guru/网站先生成对应的定时调度命令,其格式如下:


  
  1. minute hour day month week
  2. minute:表示分钟,可以从0~59之间的任意整数。
  3. hour:表示小时,可以是从023之间的任意整数。
  4. day:表示日期,可以是131之间的任何整数。
  5. month:表示月份,可以是从112之间的任何整数。
  6. week:表示星期几,可以是从07之间的任何整数,这里的07代表星期日。

以上各个字段中还可以使用特殊符号代表不同意思:


  
  1. 星号(*):代表所有可能的值,例如month字段如果是星号,则表示在满足其它字段的制约条件后每月都执行该命令操作。
  2. 逗号(,):可以用逗号隔开的值指定一个列表范围,例如,”1,2,5,7,8,9
  3. 中杠(-):可以用整数之间的中杠表示一个整数范围,例如”2-6”表示”2,3,4,5,6
  4. 正斜线(/):可以用正斜线指定时间的间隔频率,步长,例如”0-23/2”表示每两小时执行一次。

在python配置文件中使用如下:


  
  1. default_args = {
  2. 'owner': 'airflow', # 拥有者名称
  3. 'start_date': datetime(2021, 9, 4), # 第一次开始执行的时间,为 UTC 时间
  4. 'retries': 1, # 失败重试次数
  5. 'retry_delay': timedelta(minutes=5), # 失败重试间隔
  6. }
  7. dag = DAG(
  8. dag_id = 'cron_test', #DAG id ,必须完全由字母、数字、下划线组成
  9. default_args = default_args, #外部定义的 dic 格式的参数
  10. schedule_interval = '* * * * *' # 使用Crontab 定时任务命令,每分钟运行一次
  11. )

 

  • datetime.timedelta

timedelta是使用python timedelta 设置调度周期,可以配置天、周、小时、分钟、秒、毫秒。在python配置文件中使用如下:


  
  1. default_args = {
  2. 'owner': 'airflow', # 拥有者名称
  3. 'start_date': datetime(2021, 9, 4), # 第一次开始执行的时间,为 UTC 时间
  4. 'retries': 1, # 失败重试次数
  5. 'retry_delay': timedelta(minutes=5), # 失败重试间隔
  6. }
  7. dag = DAG(
  8. dag_id = 'cron_test', #DAG id ,必须完全由字母、数字、下划线组成
  9. default_args = default_args, #外部定义的 dic 格式的参数
  10. schedule_interval = timedelta(minutes=5) # 使用python timedelta 设置调度周期,可以配置天、周、小时、分钟、秒、毫秒
  11. )

 

 

五、​​​​​​​DAG任务依赖设置

1、​​​​​​​DAG任务依赖设置一

  • DAG调度流程图

  • task执行依赖
A >> B >>C
 
  • 完整代码

  
  1. '''
  2. airflow 任务依赖关系设置一
  3. '''
  4. from airflow import DAG
  5. from airflow.operators.bash import BashOperator
  6. from datetime import datetime, timedelta
  7. default_args = {
  8. 'owner': 'airflow', # 拥有者名称
  9. 'start_date': datetime(2021, 9, 22), # 第一次开始执行的时间,为 UTC 时间
  10. 'retries': 1, # 失败重试次数
  11. 'retry_delay': timedelta(minutes=5), # 失败重试间隔
  12. }
  13. dag = DAG(
  14. dag_id = 'dag_relation_1', #DAG id ,必须完全由字母、数字、下划线组成
  15. default_args = default_args, #外部定义的 dic 格式的参数
  16. schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
  17. )
  18. A = BashOperator(
  19. task_id='A',
  20. bash_command='echo "run A task"',
  21. dag=dag
  22. )
  23. B = BashOperator(
  24. task_id='B',
  25. bash_command='echo "run B task"',
  26. dag=dag
  27. )
  28. C = BashOperator(
  29. task_id='C',
  30. bash_command='echo "run C task"',
  31. dag=dag,
  32. retries=3
  33. )
  34. A >> B >>C

2、​​​​​​​DAG任务依赖设置二

  • DAG调度流程图

  • task执行依赖
[A,B] >>C >>D
 
  • 完整代码

 


  
  1. '''
  2. airflow 任务依赖关系设置二
  3. '''
  4. from airflow import DAG
  5. from airflow.operators.bash import BashOperator
  6. from datetime import datetime, timedelta
  7. default_args = {
  8. 'owner': 'airflow', # 拥有者名称
  9. 'start_date': datetime(2021, 9, 22), # 第一次开始执行的时间,为 UTC 时间
  10. 'retries': 1, # 失败重试次数
  11. 'retry_delay': timedelta(minutes=5), # 失败重试间隔
  12. }
  13. dag = DAG(
  14. dag_id = 'dag_relation_2', #DAG id ,必须完全由字母、数字、下划线组成
  15. default_args = default_args, #外部定义的 dic 格式的参数
  16. schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
  17. )
  18. A = BashOperator(
  19. task_id='A',
  20. bash_command='echo "run A task"',
  21. dag=dag
  22. )
  23. B = BashOperator(
  24. task_id='B',
  25. bash_command='echo "run B task"',
  26. dag=dag
  27. )
  28. C = BashOperator(
  29. task_id='C',
  30. bash_command='echo "run C task"',
  31. dag=dag,
  32. retries=3
  33. )
  34. D = BashOperator(
  35. task_id='D',
  36. bash_command='echo "run D task"',
  37. dag=dag
  38. )
  39. [A,B] >>C >>D

3、​​​​​​​DAG任务依赖设置三

  • DAG调度流程图

  • task执行依赖
[A,B,C] >>D >>[E,F]
 
  • 完整代码

 


  
  1. '''
  2. airflow 任务依赖关系设置三
  3. '''
  4. from airflow import DAG
  5. from airflow.operators.bash import BashOperator
  6. from datetime import datetime, timedelta
  7. default_args = {
  8. 'owner': 'airflow', # 拥有者名称
  9. 'start_date': datetime(2021, 9, 22), # 第一次开始执行的时间,为 UTC 时间
  10. 'retries': 1, # 失败重试次数
  11. 'retry_delay': timedelta(minutes=5), # 失败重试间隔
  12. }
  13. dag = DAG(
  14. dag_id = 'dag_relation_3', #DAG id ,必须完全由字母、数字、下划线组成
  15. default_args = default_args, #外部定义的 dic 格式的参数
  16. schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
  17. )
  18. A = BashOperator(
  19. task_id='A',
  20. bash_command='echo "run A task"',
  21. dag=dag
  22. )
  23. B = BashOperator(
  24. task_id='B',
  25. bash_command='echo "run B task"',
  26. dag=dag
  27. )
  28. C = BashOperator(
  29. task_id='C',
  30. bash_command='echo "run C task"',
  31. dag=dag,
  32. retries=3
  33. )
  34. D = BashOperator(
  35. task_id='D',
  36. bash_command='echo "run D task"',
  37. dag=dag
  38. )
  39. E = BashOperator(
  40. task_id='E',
  41. bash_command='echo "run E task"',
  42. dag=dag
  43. )
  44. F = BashOperator(
  45. task_id='F',
  46. bash_command='echo "run F task"',
  47. dag=dag
  48. )
  49. [A,B,C] >>D >>[E,F]

4、​​​​​​​DAG任务依赖设置四

  • DAG调度流程图

 

  • task执行依赖

  
  1. A >>B>>C>>D
  2. A >>E>>F

 

  • 完整代码

  
  1. '''
  2. airflow 任务依赖关系设置四
  3. '''
  4. from airflow import DAG
  5. from airflow.operators.bash import BashOperator
  6. from datetime import datetime, timedelta
  7. default_args = {
  8. 'owner': 'airflow', # 拥有者名称
  9. 'start_date': datetime(2021, 9, 22), # 第一次开始执行的时间,为 UTC 时间
  10. 'retries': 1, # 失败重试次数
  11. 'retry_delay': timedelta(minutes=5), # 失败重试间隔
  12. }
  13. dag = DAG(
  14. dag_id = 'dag_relation_4', #DAG id ,必须完全由字母、数字、下划线组成
  15. default_args = default_args, #外部定义的 dic 格式的参数
  16. schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
  17. )
  18. A = BashOperator(
  19. task_id='A',
  20. bash_command='echo "run A task"',
  21. dag=dag
  22. )
  23. B = BashOperator(
  24. task_id='B',
  25. bash_command='echo "run B task"',
  26. dag=dag
  27. )
  28. C = BashOperator(
  29. task_id='C',
  30. bash_command='echo "run C task"',
  31. dag=dag,
  32. retries=3
  33. )
  34. D = BashOperator(
  35. task_id='D',
  36. bash_command='echo "run D task"',
  37. dag=dag
  38. )
  39. E = BashOperator(
  40. task_id='E',
  41. bash_command='echo "run E task"',
  42. dag=dag
  43. )
  44. F = BashOperator(
  45. task_id='F',
  46. bash_command='echo "run F task"',
  47. dag=dag
  48. )
  49. A >>[B,C,D]
  50. A >>[E,F]

5、​​​​​​​DAG任务依赖设置五

  • DAG调度流程图

 

  • task执行依赖

  
  1. A >>B>>E
  2. C >>D>>E

 

  • 完整代码

  
  1. '''
  2. airflow 任务依赖关系设置五
  3. '''
  4. from airflow import DAG
  5. from airflow.operators.bash import BashOperator
  6. from datetime import datetime, timedelta
  7. default_args = {
  8. 'owner': 'airflow', # 拥有者名称
  9. 'start_date': datetime(2021, 9, 22), # 第一次开始执行的时间,为 UTC 时间
  10. 'retries': 1, # 失败重试次数
  11. 'retry_delay': timedelta(minutes=5), # 失败重试间隔
  12. }
  13. dag = DAG(
  14. dag_id = 'dag_relation_5', #DAG id ,必须完全由字母、数字、下划线组成
  15. default_args = default_args, #外部定义的 dic 格式的参数
  16. schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
  17. )
  18. A = BashOperator(
  19. task_id='A',
  20. bash_command='echo "run A task"',
  21. dag=dag
  22. )
  23. B = BashOperator(
  24. task_id='B',
  25. bash_command='echo "run B task"',
  26. dag=dag
  27. )
  28. C = BashOperator(
  29. task_id='C',
  30. bash_command='echo "run C task"',
  31. dag=dag,
  32. retries=3
  33. )
  34. D = BashOperator(
  35. task_id='D',
  36. bash_command='echo "run D task"',
  37. dag=dag
  38. )
  39. E = BashOperator(
  40. task_id='E',
  41. bash_command='echo "run E task"',
  42. dag=dag
  43. )
  44. A >>B>>E
  45. C >>D>>E

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨ 

文章来源: lansonli.blog.csdn.net,作者:Lansonli,版权归原作者所有,如需转载,请联系作者。

原文链接:lansonli.blog.csdn.net/article/details/123746605

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。