系統研究Airbnb開源項目airflow
調研了一些幾個調度系統, airflow 更滿意一些. 花了些時間寫了這個博文, 這應該是國內技術圈中最早系統性研究airflow的文章了. 轉載請注明出處 http://www.cnblogs.com/harrychinese/ .
========================
airflow概況
========================
文檔:
http://airflow.readthedocs.org/en/latest/
幾個調度系統的比較, 可參考:
http://www.rigongyizu.com/about-workflow-schedule-system/
Principles:
動態性: Airflow pipeline是以python code形式做配置, 靈活性沒得說了.
擴展性: 可以自定義operator(運算子), 有幾個executor(執行器)可供選擇.
優雅:pipeline的定義很簡單明了, 基于jinja模板引擎很容易做到腳本命令參數化.
擴展性:模塊化的結構, 加上使用了message queue來編排多個worker(啟用CeleryExcutor), airflow可以做到無限擴展.
競品:
airflow并不是data streaming方案, 所以不是Spark Streaming/Storm的競品. 和airflow類似的有: Apache Oozie, Linkedin Azkaban.
比較優勢:
linkedin的Azkaban很不錯, UI尤其很贊, 使用java properties文件維護上下游關系, 任務資源文件需要打包成zip, 部署不是很方便.
Apache Oozie, 使用XML配置, Oozie任務的資源文件都必須存放在HDFS上. 配置不方便同時也只能用于Hadoop.
Spotify的 Luigi, UI 太爛.
airflow 總體都不錯, 有實用的UI, 豐富的cli工具, Task上下游使用python編碼, 能保證靈活性和適應性.
========================
概念:
========================
不用多說概念自然非常重要, 這是理解airflow的基礎.
---------------
Operators:
---------------
基本可以理解為一個抽象化的task, Operator加上必要的運行時上下文就是一個task. 有三類Operator:
1. Sensor(傳感監控器), 監控一個事件的發生.
2. Trigger(或者叫做Remote Excution), 執行某個遠端動作, (我在代碼中沒有找到這個類別)
3. Data transfer(數據轉換器), 完成數據轉換
---------------
Hooks:
---------------
Hook是airflow與外部平臺/數據庫交互的方式, 一個Hook類就像是一個JDBC driver一樣. airflow已經實現了jdbc/ftp/http/webhdfs很多hook. 要訪問RDBMS數據庫 有兩類Hook可供選擇, 基于原生Python DBAPI的Hook和基于JDBC的Hook, 以Oracle為例,
OracleHook, 是通過cx_Oracle 訪問Oracle數據, 即原生Python binding, 有些原生的Hook支持Bulk load.
JdbcHook, 是通過jaydebeapi+Oracle JDBC訪問Oracle數據
Tasks: task代表DAG中的一個節點, 它其實是一個BaseOperator子類.
Task instances, 即task的運行態實例, 它包含了task的status(成功/失敗/重試中/已啟動)
Job: Airflow中Job很少提及, 但在數據庫中有個job表, 需要說明的是Job和task并不是一回事, Job可以簡單理解為Airflow的批次, 更準確的說法是同一批被調用task或dag的統一代號. 有三類Job, 分別SchedulerJob/BackfillJob/LocalTaskJob, 對于SchedulerJob和BackfillJob, job指的是指定dag這次被調用的運行時代號, LocalTaskJob是指定task的運行時代號.
---------------
Connections:
---------------
我們的Task需要通過Hook訪問其他資源, Hook僅僅是一種訪問方式, 就像是JDBC driver一樣, 要連接DB, 我們還需要DB的IP/Port/User/Pwd等信息. 這些信息不太適合hard code在每個task中, 可以把它們定義成Connection, airflow將這些connection信息存放在后臺的connection表中. 我們可以在WebUI的Admin->Connections管理這些連接.
---------------
Variables:
---------------
Variable 沒有task_id/dag_id屬性, 往往用來定義一些系統級的常量或變量, 我們可以在WebUI或代碼中新建/更新/刪除Variable. 也可以在WebUI上維護變量.
Variable 的另一個重要的用途是, 我們為Prod/Dev環境做不同的設置, 詳見后面的開發小節.
---------------
XComs:
---------------
XCom和Variable類似, 用于Task之間共享一些信息. XCom 包含task_id/dag_id屬性, 適合于Task之間傳遞數據, XCom使用方法比Variables復雜些. 比如有一個dag, 兩個task組成(T1->T2), 可以在T1中使用xcom_push()來推送一個kv, 在T2中使用xcom_pull()來獲取這個kv.
---------------
Trigger Rules:
---------------
可以為dag中的每個task都指定它的觸發條件, 這里的觸發條件有兩個維度, 以T1&T2->T3 這樣的dag為例:
一個維度是: 要根據dag上次運行T3的狀態確定本次T3是否被調用, 由
DAG的default_args.depends_on_past參數控制, 為True時, 只有上次T3運行成功, 這次T3才會被觸發
另一個維度是: 要根據前置T1和T2的狀態確定本次T3是否被調用, 由T3.trigger_rule參數控制, 有下面6種情形, 缺省是all_success.
all_success: (default) all parents have succeeded
all_failed: all parents are in a failed or upstream_failed state
all_done: all parents are done with their execution
one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done
one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done
dummy: dependencies are just for show, trigger at will
---------------
分支的支持:
---------------
airflow有兩個基于PythonOperator的Operator來支持dag分支功能.
ShortCircuitOperator, 用來實現流程的判斷. Task需要基于ShortCircuitOperator, 如果本Task返回為False的話, 其下游Task將被skip; 如果為True的話, 其下游Task將會被正常執行. 尤其適合用在其下游都是單線節點的場景.
BranchPythonOperator, 用來實現Case分支. Task需要基于BranchPythonOperator, airflow會根據本task的返回值(返回值是某個下游task的id),來確定哪個下游Task將被執行, 其他下游Task將被skip.
---------------
airflow系統表:
---------------
connection 表:
我們的Task往往需要通過jdbc/ftp/http/webhdfs方式訪問其他資源, 一般地訪問資源時候都需要一些簽證, airflow允許我們將這些connection以及鑒證存放在connection表中. 可以現在WebUI的Admin->Connections管理這些連接, 在代碼中使用這些連接.
需要說明的是, connection表有2個id欄位, 一個是id, 一個是conn_id, id欄位是該表的PK, conn_id欄位是connection的名義id, 也就是說我們可以定義多個同名的conn_id, 當使用使用時airflow將會從同名的conn_id的列表中隨機選一個, 有點基本的load balance的意思.
user 表 :
包含user的username和email信息. 我們可以在WebUI的Admin->Users管理.
variable 表 :
包含定義variable
xcom 表:
包含xcom的數據
dag 表:
包含dag的定義信息, dag_id是PK(字符型)
dag_run 表:
包含dag的運行歷史記錄, 該表也有兩個id欄位, 一個是id, 一個是run_id, id欄位是該表的PK, run_id欄位是這次運行的一個名字(字符型), 同一個dag, 它的run_id 不能重復.
物理PK: 即id欄位
邏輯PK: dag_id + execution_date 組合
execution_date 欄位, 表示觸發dag的準確時間
注意: 沒有 task 表:
airflow的task定義在python源碼中, 不在DB中存放注冊信息.
task_instance 表:
物理PK: 該表沒有物理PK
邏輯PK: dag_id + task_id + execution_date 組合.
execution_date 欄位, 表示觸發dag的準確時間,是datetime類型字段
start_date/end_date 欄位,表示執行task的起始/終止時間, 也是datetime類型字段
job 表:
包含job(這里可以理解為批次)的運行狀態信息
========================
操作
========================
---------------
安裝和配置
---------------
1. 操作系統:
Airflow不能在Windows上部署, 原因是使用了 gunicorn 作為其web server(目前gunicorn還不支持Windows), 另外我也在代碼中看到一些hard code了一些bash命令.
2. shell 解釋器:
操作系統應該安裝bash shell, 運行airflow服務的賬號, 最好默認使用bash shell.
3. Python版本:
目前Airflow只是實驗性地支持Python3, 推薦使用Python2.7
4. Backend 數據庫:
SqlAlchemy 支持的數據庫都可以作為Backend, 甚至Sqlite(非常適合做Demo或臨時體驗一下), 官方推薦采用 MySQL 或 Postgres. 我試了Oracle, 但最終還是以失敗告終. MySQL 應該使用 mysqlclient 包, 我簡單試了mysql-connector-python 有報錯.
5. Executor的選擇:
有三個 Executor 可供選擇, 分別是: SequentialExecutor 和 LocalExecutor 和 CeleryExecutor, SequentialExecutor僅僅適合做Demo(搭配Sqlite backend), LocalExecutor 和 CeleryExecutor 都可用于生產環境, CeleryExecutor 將使用 Celery 作為Task執行的引擎, 擴展性很好, 當然配置也更復雜, 需要先setup Celery的backend(包括RabbitMQ, Redis)等. 其實真正要求擴展性的場景并不多, 所以LocalExecutor 是一個很不錯的選擇了.
---------------
初始化配置:
---------------
1. 配置OS環境變量 AIRFLOW_HOME, AIRFLOW_HOME缺省為 ~/airflow
2. 運行下面命令初始化一個Sqlite backend DB, 并生成airflow.cfg文件
your_python ${AIRFLOW_HOME}\bin\airflow initdb
3. 如果需要修改backend DB類型, 修改$AIRFLOW_HOME/airflow.cfg文件 sql_alchemy_conn后, 然后重新運行 airflow initdb .
官方推薦使用MySQL/PostgreSQL做DB Server.
MySQL 應該使用 mysqlclient 驅動, 我試驗了mysql-connector-python 驅動, 結果airflow 網頁端報錯
我試著用Oracle 做DB, 解決了很多問題, 但終究還是不能完全運行起來.
4. 修改$AIRFLOW_HOME/airflow.cfg文件
重新設置Backend/Executor, 以及webserver端口, 設置dags_folder目錄和base_log_folder目錄. 有下面3個參數用于控制Task的并發度,
parallelism, 一個Executor同時運行task實例的個數
dag_concurrency, 一個dag中某個task同時運行的實例個數
max_active_runs_per_dag: 一個dag同時啟動的實例個數
---------------
了解幾種作業運行模式
---------------
test 作業運行模式:
該task是在本地運行, 不會發送到遠端celery worker, 也不檢查依賴狀態, 也不將結果記錄到airflow DB中, log也僅僅會在屏幕輸出, 不記錄到log文件.
使用場景: 多用于測試單個作業的code的邏輯. 可以通過test 命令進入test 模式.
mark_success 作業運行模式:
僅僅將作業在DB中Mark為success, 但并不真正執行作業
使用場景: 多用于測試整個dag流程控制, 或者為某個task在DB中補一些狀態. 可以在backfill命令和 run命令中啟用.
dry_run 作業運行模式:
airflow不檢查作業的上下游依賴, 也不會將運行結果記錄不到airflow DB中. 具體作業的運行內容分情況:
如果你的Operator沒有重載 dry_run()方法的話, 運行作業也僅打印一點作業執行log
如果重載BaseOperator的dry_run()方法的話, 運行作業即是執行你的dry_run()
使用場景: 個人覺得 dry_run 模式意義并不大, 可以在backfill命令和 test 命令中啟用
---------------
命令行工具
---------------
airflow 包安裝后, 會往我們your_python\bin目錄復制一個名為 airflow 的文件, 可以直接運行.
下面是該命令行工具支持的命令
1. 初始化airflow meta db
airflow initdb [-h]
2. 升級airflow meta db
airflow upgradedb [-h]
3. 開啟web server
airflow webserver --debug=False
開啟airflow webserver, 但不進入flask的debug模式
4. 顯示task清單
airflow list_tasks --tree=True -sd=/home/docs/airflow/dags
以Tree形式, 顯示/home/docs/airflow/dags下的task 清單
5. 檢查Task狀態
airflow task_state -sd=/home/docs/airflow/dags dag_id task_id execution_date
這里的 execution_date 是觸發dag的準確時間, 是DB的datetime類型, 而不是Date類型
6. 開啟一個dag調度器
airflow scheduler [-d DAG_ID] -sd=/home/docs/airflow/dags [-n NUM_RUNS]
啟動dag調度器, 注意啟動調度器, 并不意味著dag會被馬上觸發, dag觸發需要符合它自己的schedule規則.
參數NUM_RUNS, 如果指定的話, dag將在運行NUM_RUNS次后退出. 沒有指定時, scheduler將一直運行.
參數DAG_ID可以設定, 也可以缺省, 含義分別是:
如果設定了DAG_ID, 則為該DAG_ID專門啟動一個scheduler;
如果缺省DAG_ID, airflow會為每個dag(subdag除外)都啟動一個scheduler.
7. 立即觸發一個dag, 可以為dag指定一個run id, 即dag的運行實例id.
airflow trigger_dag [-h] [-r RUN_ID] dag_id
立即觸發運行一個dag, 如果該dag的scheduler沒有運行的話, 將在scheduler啟動后立即執行dag
8. 批量回溯觸發一個dag
airflow backfill [-s START_DATE] [-e END_DATE] [-sd SUBDIR] --mark_success=False --dry_run=False dag_id
有時候我們需要**立即**批量補跑一批dag, 比如為demo準備點執行歷史, 比如補跑錯過的運行機會. DB中dag execute_date記錄不是當下時間, 而是按照 START_DATE 和 scheduler_interval 推算出的時間.
如果缺省了END_DATE參數, END_DATE等同于START_DATE.
9. 手工調用一個Task
airflow run [-sd SUBDIR] [-s TASK_START_DATE] --mark_success=False dag_id task_id execution_date
該命令參數很多, 如果僅僅是測試運行, 建議使用test命令代替.
10. 測試一個Task
airflow test -sd=/home/docs/airflow/dags --dry_run=False dag_id task_id execution_date
airflow test -sd=/home/docs/airflow/dags --dry_run=False dag_id task_id 2015-12-31
以 test 或 dry_run 模式 運行作業.
11. 清空dag下的Task運行實例
airflow clear [-s START_DATE] [-e END_DATE] [-sd SUBDIR] dag_id
12. 顯示airflow的版本號
airflow version
========================
Airflow 開發
========================
---------------
dag腳本開發
---------------
dag腳本可參考example_dags目錄中的sample, 然后將腳本存放到airflow.cfg指定的dags_folder下.
airflow 已經包含實現很多常用的 operator, 包括 BashOperator/EmailOperator/JdbcOperator/PythonOperator/ShortCircuitOperator/BranchPythonOperator/TriggerDagRunOperator等, 基本上夠用了, 如果要實現自己的Operator, 繼承BaseOperator, 一般只需要實現execute()方法即可.
pre_execute()/post_execute()用處不大, 不用特別關注, 另外post_execute()是在on_failure_callback/on_success_callback回調函數之前執行的, 所以, 也不適合回寫作業狀態.
作業流程串接的幾個小貼士:
使用 DummyOperator 來匯聚分支
使用 ShortCircuitOperator/BranchPythonOperator 做分支
使用 SubDagOperator 嵌入一個子dag
使用 TriggerDagRunOperator 直接trigger 另一個dag
T_B.set_upstream(T_A), T_A->T_B, 通過task對象設置它的上游
T_1.set_downstream(T_2), T_1->T_2 , 通過task對象設置它的下游
airflow.utils.chain(T_1, T_2, T_3), 通過task對象設置依賴關系, 這個方法就能一次設置長的執行流程, T_1->T_2->T_3
dag.set_dependency('T_1_id', 'T_2_id'), 通過id設置依賴關系
---------------
擴展airflow界面
---------------
擴展airflow, 比如WebUI上增加一個菜單項, 可以按照plugin形式實現.
---------------
自己的表如何關聯airflow的表
---------------
很多時候airflow DB的各個表不夠用, 我們需要增加自己的表. 比如增加一個my_batch_instance表, 一個my_task_instance表, my_batch_instance需要關聯airflow dag_run表, my_task_instance需要關聯airflow task_instance表.
my_batch_instance表中, 增加airflow_dag_id和airflow_execute_date, 來對應airflow dag_run表的邏輯PK; my_task_instance表增加airflow_dag_id和airflow_task_id和airflow_execute_date, 對應airflow task_instance表的邏輯PK.
接下來的問題是, 如何在task的python代碼中, 獲取這些邏輯PK值? 其實也很簡答, 我們的task都繼承于BaseOperation類, BaseOperation.execute(self, context)方法, 有一個context參數, 它包含很豐富的信息, 有:
dag定義對象, dag.dag_id 即是 dag_id 值
task定義對象,task.task_id 即是 task_id 值
execution_date相關的幾個屬性(包括datetime類型的execution_date, 字符類型的ds, 更短字符型的ds_nodash)
context是一個dict,完整的內容是
{
'dag': task.dag,
'ds': ds,
'yesterday_ds': yesterday_ds,
'tomorrow_ds': tomorrow_ds,
'END_DATE': ds,
'ds_nodash': ds_nodash,
'end_date': ds,
'dag_run': dag_run,
'run_id': run_id,
'execution_date': task_instance.execution_date,
'latest_date': ds,
'macros': macros,
'params': params,
'tables': tables,
'task': task,
'task_instance': task_instance,
'ti': task_instance,
'task_instance_key_str': task_instance_key_str,
'conf': configuration,
}
execution_date相關的幾個屬性具體取值是:
ds=execution_date.isoformat()[:10]
ds_nodash = ds.replace('-', '')
ti_key_str_fmt = "{task.dag_id}__{task.task_id}__{ds_nodash}"
task_instance_key_str = ti_key_str_fmt.format(task,ds_nodash)
task_instance_key_str 值可以看做是Task instance表的單一的邏輯PK, 很可惜的是Task instance沒有這個字段.
---------------
如何及時拿到airflow task的狀態
---------------
舉例說明, 比如我的task是執行一個bash shell, 為了能將task的信息及時更新到自己的表中, 需要基于BashOperator的實現一個子類MyBashOperator, 在execute(context)方法中, 將running狀態記錄到自己的表中.
另外, 在創建MyBashOperator的實例時候, 為on_failure_callback和on_success_callback參數設置兩個回調函數, 我們在回調函數中, 將success或failed狀態記錄到自己的表中.
on_failure_callback/on_success_callback回調函數簽名同execute(), 都有一個context參數.
---------------
為生產環境和測試環境提供不同的設置
---------------
系統級的設置, 見airflow.cfg文檔
DAG級別的設置, 我們可為Prod/Dev環境準備不同的default_args,
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args)
通過Variable, 加載不同環境的配置. 詳細思路如下:
比如我們有一個My_Cfg參數, 在Prod和Dev取值有可能不同.
首先設置一個 Environment_Flag variable, 其取值是Prod或Dev.
然后, 定義為My_Cfg參數設定兩個變量, My_Cfg_For_Prod 和 My_Cfg_For_Dev, 并賦值, 分別對應Prod/Dev環境下My_Cfg的取值.
在代碼中, 我們就可以通過Environment_Flag的取值, 就知道是該訪問 My_Cfg_For_Prod 變量還是 My_Cfg_For_Dev 變量, 進而得到My_Cfg的取值.
---------------
Regular的External trigger觸發dag的推薦用法
---------------
外部觸發需要trigger_dag命令行, 命令行最好要加上run_id參數;
同時DAG的schedule_interval參數最好設置成None, 表明這個DAG始終是由外部觸發
---------------
測試運行步驟:
---------------
1. 先測試pytnon代碼正確性
python ~/airflow/dags/tutorial.py
2. 通過命令行驗證DAG/task設置
# print the list of active DAGs
airflow list_dags
# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial
# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree
3. 通過test命令行試跑一下, 測試一下code邏輯
airflow test tutorial my_task_id 2015-06-01
4. 通過 backfill --mark_success=True
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07