系統研究Airbnb開源項目airflow

jopen 8年前發布 | 64K 次閱讀 數據庫 Python 開源

調研了一些幾個調度系統, airflow 更滿意一些. 花了些時間寫了這個博文, 這應該是國內技術圈中最早系統性研究airflow的文章了.  轉載請注明出處 http://www.cnblogs.com/harrychinese/ .

========================

airflow概況

========================

文檔:

http://airflow.readthedocs.org/en/latest/

幾個調度系統的比較, 可參考:

http://www.rigongyizu.com/about-workflow-schedule-system/

Principles:

動態性: Airflow pipeline是以python code形式做配置, 靈活性沒得說了.

擴展性: 可以自定義operator(運算子), 有幾個executor(執行器)可供選擇.

優雅:pipeline的定義很簡單明了, 基于jinja模板引擎很容易做到腳本命令參數化.

擴展性:模塊化的結構, 加上使用了message queue來編排多個worker(啟用CeleryExcutor), airflow可以做到無限擴展.

競品:

airflow并不是data streaming方案, 所以不是Spark Streaming/Storm的競品. 和airflow類似的有: Apache Oozie, Linkedin Azkaban.

比較優勢:

linkedin的Azkaban很不錯, UI尤其很贊, 使用java properties文件維護上下游關系, 任務資源文件需要打包成zip, 部署不是很方便.

Apache Oozie, 使用XML配置, Oozie任務的資源文件都必須存放在HDFS上. 配置不方便同時也只能用于Hadoop.

Spotify的 Luigi, UI 太爛.

airflow 總體都不錯, 有實用的UI, 豐富的cli工具, Task上下游使用python編碼, 能保證靈活性和適應性.

========================

概念:

========================

不用多說概念自然非常重要, 這是理解airflow的基礎.

---------------

Operators:

---------------  

基本可以理解為一個抽象化的task, Operator加上必要的運行時上下文就是一個task. 有三類Operator:

1. Sensor(傳感監控器), 監控一個事件的發生.

2. Trigger(或者叫做Remote Excution), 執行某個遠端動作, (我在代碼中沒有找到這個類別)

3. Data transfer(數據轉換器), 完成數據轉換

---------------

Hooks:

---------------    

Hook是airflow與外部平臺/數據庫交互的方式, 一個Hook類就像是一個JDBC driver一樣. airflow已經實現了jdbc/ftp/http/webhdfs很多hook. 要訪問RDBMS數據庫 有兩類Hook可供選擇, 基于原生Python DBAPI的Hook和基于JDBC的Hook, 以Oracle為例,

OracleHook, 是通過cx_Oracle 訪問Oracle數據, 即原生Python binding, 有些原生的Hook支持Bulk load.

JdbcHook, 是通過jaydebeapi+Oracle JDBC訪問Oracle數據         

Tasks: task代表DAG中的一個節點, 它其實是一個BaseOperator子類.

Task instances, 即task的運行態實例, 它包含了task的status(成功/失敗/重試中/已啟動)

Job: Airflow中Job很少提及, 但在數據庫中有個job表, 需要說明的是Job和task并不是一回事, Job可以簡單理解為Airflow的批次, 更準確的說法是同一批被調用task或dag的統一代號. 有三類Job, 分別SchedulerJob/BackfillJob/LocalTaskJob, 對于SchedulerJob和BackfillJob, job指的是指定dag這次被調用的運行時代號, LocalTaskJob是指定task的運行時代號.

---------------

Connections:

---------------  

我們的Task需要通過Hook訪問其他資源, Hook僅僅是一種訪問方式, 就像是JDBC driver一樣, 要連接DB, 我們還需要DB的IP/Port/User/Pwd等信息. 這些信息不太適合hard code在每個task中, 可以把它們定義成Connection, airflow將這些connection信息存放在后臺的connection表中. 我們可以在WebUI的Admin->Connections管理這些連接.

---------------

Variables:

---------------   

Variable 沒有task_id/dag_id屬性, 往往用來定義一些系統級的常量或變量,  我們可以在WebUI或代碼中新建/更新/刪除Variable. 也可以在WebUI上維護變量.

Variable 的另一個重要的用途是, 我們為Prod/Dev環境做不同的設置, 詳見后面的開發小節.

---------------

XComs:

---------------  

XCom和Variable類似, 用于Task之間共享一些信息. XCom 包含task_id/dag_id屬性, 適合于Task之間傳遞數據, XCom使用方法比Variables復雜些. 比如有一個dag, 兩個task組成(T1->T2), 可以在T1中使用xcom_push()來推送一個kv, 在T2中使用xcom_pull()來獲取這個kv.

---------------

Trigger Rules:

---------------  

可以為dag中的每個task都指定它的觸發條件, 這里的觸發條件有兩個維度, 以T1&T2->T3 這樣的dag為例:

一個維度是: 要根據dag上次運行T3的狀態確定本次T3是否被調用, 由

DAG的default_args.depends_on_past參數控制, 為True時, 只有上次T3運行成功, 這次T3才會被觸發

另一個維度是: 要根據前置T1和T2的狀態確定本次T3是否被調用, 由T3.trigger_rule參數控制, 有下面6種情形, 缺省是all_success.  

all_success: (default) all parents have succeeded

all_failed: all parents are in a failed or upstream_failed state

all_done: all parents are done with their execution

one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done

one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done

dummy: dependencies are just for show, trigger at will

---------------

分支的支持:

---------------      

airflow有兩個基于PythonOperator的Operator來支持dag分支功能.

ShortCircuitOperator, 用來實現流程的判斷. Task需要基于ShortCircuitOperator, 如果本Task返回為False的話, 其下游Task將被skip; 如果為True的話, 其下游Task將會被正常執行.  尤其適合用在其下游都是單線節點的場景.  

BranchPythonOperator, 用來實現Case分支. Task需要基于BranchPythonOperator, airflow會根據本task的返回值(返回值是某個下游task的id),來確定哪個下游Task將被執行, 其他下游Task將被skip.

---------------

airflow系統表:

---------------

connection 表:

我們的Task往往需要通過jdbc/ftp/http/webhdfs方式訪問其他資源, 一般地訪問資源時候都需要一些簽證, airflow允許我們將這些connection以及鑒證存放在connection表中.  可以現在WebUI的Admin->Connections管理這些連接, 在代碼中使用這些連接.

需要說明的是, connection表有2個id欄位, 一個是id, 一個是conn_id, id欄位是該表的PK, conn_id欄位是connection的名義id, 也就是說我們可以定義多個同名的conn_id, 當使用使用時airflow將會從同名的conn_id的列表中隨機選一個, 有點基本的load balance的意思.

user 表 :

包含user的username和email信息.  我們可以在WebUI的Admin->Users管理.

variable 表 :

包含定義variable

xcom 表:

包含xcom的數據

dag 表:

包含dag的定義信息, dag_id是PK(字符型)

dag_run 表:

包含dag的運行歷史記錄, 該表也有兩個id欄位, 一個是id, 一個是run_id, id欄位是該表的PK, run_id欄位是這次運行的一個名字(字符型), 同一個dag, 它的run_id 不能重復.

物理PK: 即id欄位

邏輯PK: dag_id + execution_date 組合

execution_date 欄位, 表示觸發dag的準確時間

注意: 沒有 task 表:

airflow的task定義在python源碼中, 不在DB中存放注冊信息.

task_instance 表:

物理PK: 該表沒有物理PK

邏輯PK: dag_id + task_id + execution_date 組合.

execution_date 欄位, 表示觸發dag的準確時間,是datetime類型字段

start_date/end_date 欄位,表示執行task的起始/終止時間, 也是datetime類型字段

job 表:

包含job(這里可以理解為批次)的運行狀態信息

========================

操作

========================

---------------

安裝和配置

---------------

1. 操作系統:

Airflow不能在Windows上部署, 原因是使用了 gunicorn 作為其web server(目前gunicorn還不支持Windows), 另外我也在代碼中看到一些hard code了一些bash命令.

2. shell 解釋器:

操作系統應該安裝bash shell, 運行airflow服務的賬號, 最好默認使用bash shell.

3. Python版本:

目前Airflow只是實驗性地支持Python3, 推薦使用Python2.7

4. Backend 數據庫:

SqlAlchemy 支持的數據庫都可以作為Backend, 甚至Sqlite(非常適合做Demo或臨時體驗一下), 官方推薦采用 MySQL 或 Postgres. 我試了Oracle, 但最終還是以失敗告終. MySQL 應該使用 mysqlclient 包, 我簡單試了mysql-connector-python 有報錯.

5. Executor的選擇:

有三個 Executor 可供選擇, 分別是: SequentialExecutor 和 LocalExecutor 和 CeleryExecutor, SequentialExecutor僅僅適合做Demo(搭配Sqlite backend), LocalExecutor 和 CeleryExecutor 都可用于生產環境, CeleryExecutor 將使用 Celery 作為Task執行的引擎, 擴展性很好, 當然配置也更復雜, 需要先setup Celery的backend(包括RabbitMQ, Redis)等. 其實真正要求擴展性的場景并不多, 所以LocalExecutor 是一個很不錯的選擇了.

---------------

初始化配置:

---------------

1. 配置OS環境變量 AIRFLOW_HOME, AIRFLOW_HOME缺省為 ~/airflow

2. 運行下面命令初始化一個Sqlite backend DB, 并生成airflow.cfg文件

your_python ${AIRFLOW_HOME}\bin\airflow initdb

3. 如果需要修改backend DB類型, 修改$AIRFLOW_HOME/airflow.cfg文件 sql_alchemy_conn后, 然后重新運行 airflow initdb .

官方推薦使用MySQL/PostgreSQL做DB Server.

MySQL 應該使用 mysqlclient 驅動, 我試驗了mysql-connector-python 驅動, 結果airflow 網頁端報錯

我試著用Oracle 做DB, 解決了很多問題, 但終究還是不能完全運行起來.

4. 修改$AIRFLOW_HOME/airflow.cfg文件

重新設置Backend/Executor, 以及webserver端口, 設置dags_folder目錄和base_log_folder目錄. 有下面3個參數用于控制Task的并發度,

parallelism, 一個Executor同時運行task實例的個數

dag_concurrency, 一個dag中某個task同時運行的實例個數

max_active_runs_per_dag: 一個dag同時啟動的實例個數

---------------

了解幾種作業運行模式

---------------

test 作業運行模式:

該task是在本地運行, 不會發送到遠端celery worker, 也不檢查依賴狀態, 也不將結果記錄到airflow DB中, log也僅僅會在屏幕輸出, 不記錄到log文件.

使用場景: 多用于測試單個作業的code的邏輯.  可以通過test 命令進入test 模式.

mark_success 作業運行模式:

僅僅將作業在DB中Mark為success, 但并不真正執行作業

使用場景: 多用于測試整個dag流程控制, 或者為某個task在DB中補一些狀態. 可以在backfill命令和 run命令中啟用.

dry_run 作業運行模式:

airflow不檢查作業的上下游依賴, 也不會將運行結果記錄不到airflow DB中. 具體作業的運行內容分情況:

如果你的Operator沒有重載 dry_run()方法的話,  運行作業也僅打印一點作業執行log

如果重載BaseOperator的dry_run()方法的話,  運行作業即是執行你的dry_run()

使用場景: 個人覺得 dry_run 模式意義并不大, 可以在backfill命令和 test 命令中啟用   

---------------

命令行工具

---------------

airflow 包安裝后, 會往我們your_python\bin目錄復制一個名為 airflow 的文件, 可以直接運行.

下面是該命令行工具支持的命令

1. 初始化airflow meta db  

airflow initdb [-h]

2. 升級airflow meta db

airflow upgradedb [-h]

3. 開啟web server

airflow webserver  --debug=False

開啟airflow webserver, 但不進入flask的debug模式

4. 顯示task清單

airflow list_tasks --tree=True -sd=/home/docs/airflow/dags

以Tree形式, 顯示/home/docs/airflow/dags下的task 清單

5. 檢查Task狀態

airflow task_state  -sd=/home/docs/airflow/dags dag_id task_id execution_date

這里的 execution_date 是觸發dag的準確時間, 是DB的datetime類型, 而不是Date類型

6. 開啟一個dag調度器

airflow scheduler [-d DAG_ID] -sd=/home/docs/airflow/dags  [-n NUM_RUNS]

啟動dag調度器, 注意啟動調度器, 并不意味著dag會被馬上觸發, dag觸發需要符合它自己的schedule規則.

參數NUM_RUNS, 如果指定的話, dag將在運行NUM_RUNS次后退出. 沒有指定時, scheduler將一直運行.

參數DAG_ID可以設定, 也可以缺省, 含義分別是:

如果設定了DAG_ID, 則為該DAG_ID專門啟動一個scheduler;

如果缺省DAG_ID, airflow會為每個dag(subdag除外)都啟動一個scheduler.

7. 立即觸發一個dag, 可以為dag指定一個run id, 即dag的運行實例id.

airflow trigger_dag [-h] [-r RUN_ID] dag_id

立即觸發運行一個dag, 如果該dag的scheduler沒有運行的話, 將在scheduler啟動后立即執行dag

8. 批量回溯觸發一個dag

airflow backfill [-s START_DATE] [-e END_DATE]  [-sd SUBDIR]  --mark_success=False --dry_run=False dag_id

有時候我們需要**立即**批量補跑一批dag, 比如為demo準備點執行歷史, 比如補跑錯過的運行機會. DB中dag execute_date記錄不是當下時間, 而是按照 START_DATE 和 scheduler_interval 推算出的時間.  

如果缺省了END_DATE參數, END_DATE等同于START_DATE.

9. 手工調用一個Task

airflow run [-sd SUBDIR] [-s TASK_START_DATE] --mark_success=False  dag_id task_id execution_date

該命令參數很多, 如果僅僅是測試運行, 建議使用test命令代替.

10. 測試一個Task

airflow test -sd=/home/docs/airflow/dags --dry_run=False dag_id task_id execution_date

airflow test -sd=/home/docs/airflow/dags --dry_run=False dag_id task_id 2015-12-31

以 test 或 dry_run 模式 運行作業.

11. 清空dag下的Task運行實例

airflow clear [-s START_DATE] [-e END_DATE]  [-sd SUBDIR]  dag_id

12. 顯示airflow的版本號

airflow version  

========================

Airflow 開發

========================

---------------

dag腳本開發

---------------

dag腳本可參考example_dags目錄中的sample, 然后將腳本存放到airflow.cfg指定的dags_folder下.

airflow 已經包含實現很多常用的 operator, 包括 BashOperator/EmailOperator/JdbcOperator/PythonOperator/ShortCircuitOperator/BranchPythonOperator/TriggerDagRunOperator等, 基本上夠用了, 如果要實現自己的Operator, 繼承BaseOperator, 一般只需要實現execute()方法即可.

pre_execute()/post_execute()用處不大, 不用特別關注, 另外post_execute()是在on_failure_callback/on_success_callback回調函數之前執行的, 所以, 也不適合回寫作業狀態.

作業流程串接的幾個小貼士:

使用 DummyOperator 來匯聚分支

使用 ShortCircuitOperator/BranchPythonOperator 做分支

使用 SubDagOperator 嵌入一個子dag

使用 TriggerDagRunOperator  直接trigger 另一個dag

T_B.set_upstream(T_A), T_A->T_B, 通過task對象設置它的上游

T_1.set_downstream(T_2), T_1->T_2 , 通過task對象設置它的下游

airflow.utils.chain(T_1, T_2, T_3), 通過task對象設置依賴關系, 這個方法就能一次設置長的執行流程, T_1->T_2->T_3

dag.set_dependency('T_1_id', 'T_2_id'), 通過id設置依賴關系

---------------  

擴展airflow界面

---------------

擴展airflow,  比如WebUI上增加一個菜單項, 可以按照plugin形式實現.

---------------

自己的表如何關聯airflow的表

---------------

很多時候airflow DB的各個表不夠用, 我們需要增加自己的表. 比如增加一個my_batch_instance表, 一個my_task_instance表, my_batch_instance需要關聯airflow dag_run表, my_task_instance需要關聯airflow task_instance表.

my_batch_instance表中, 增加airflow_dag_id和airflow_execute_date, 來對應airflow dag_run表的邏輯PK; my_task_instance表增加airflow_dag_id和airflow_task_id和airflow_execute_date, 對應airflow task_instance表的邏輯PK.  

接下來的問題是, 如何在task的python代碼中, 獲取這些邏輯PK值? 其實也很簡答, 我們的task都繼承于BaseOperation類, BaseOperation.execute(self, context)方法, 有一個context參數, 它包含很豐富的信息, 有:

dag定義對象, dag.dag_id 即是 dag_id 值

task定義對象,task.task_id 即是 task_id 值

execution_date相關的幾個屬性(包括datetime類型的execution_date, 字符類型的ds, 更短字符型的ds_nodash)  

context是一個dict,完整的內容是

{

'dag': task.dag,

'ds': ds,

'yesterday_ds': yesterday_ds,

'tomorrow_ds': tomorrow_ds,

'END_DATE': ds,

'ds_nodash': ds_nodash,

'end_date': ds,

'dag_run': dag_run,

'run_id': run_id,

'execution_date': task_instance.execution_date,

'latest_date': ds,

'macros': macros,

'params': params,

'tables': tables,

'task': task,

'task_instance': task_instance,

'ti': task_instance,

'task_instance_key_str': task_instance_key_str,

'conf': configuration,

}

execution_date相關的幾個屬性具體取值是:

ds=execution_date.isoformat()[:10]       

ds_nodash = ds.replace('-', '')        

ti_key_str_fmt = "{task.dag_id}__{task.task_id}__{ds_nodash}"

task_instance_key_str = ti_key_str_fmt.format(task,ds_nodash)

task_instance_key_str 值可以看做是Task instance表的單一的邏輯PK, 很可惜的是Task instance沒有這個字段.

---------------

如何及時拿到airflow task的狀態

---------------

舉例說明, 比如我的task是執行一個bash shell, 為了能將task的信息及時更新到自己的表中, 需要基于BashOperator的實現一個子類MyBashOperator, 在execute(context)方法中, 將running狀態記錄到自己的表中.

另外, 在創建MyBashOperator的實例時候, 為on_failure_callback和on_success_callback參數設置兩個回調函數, 我們在回調函數中, 將success或failed狀態記錄到自己的表中.

on_failure_callback/on_success_callback回調函數簽名同execute(), 都有一個context參數.

---------------

為生產環境和測試環境提供不同的設置

---------------

系統級的設置, 見airflow.cfg文檔

DAG級別的設置, 我們可為Prod/Dev環境準備不同的default_args,  

default_args = {

'owner': 'airflow',

'depends_on_past': False,

'start_date': datetime(2015, 6, 1),

'email': ['airflow@airflow.com'],

'email_on_failure': False,

'email_on_retry': False,

'retries': 1,

'retry_delay': timedelta(minutes=5),

# 'queue': 'bash_queue',

# 'pool': 'backfill',

# 'priority_weight': 10,

# 'end_date': datetime(2016, 1, 1),

}

dag = DAG('tutorial', default_args=default_args)

通過Variable, 加載不同環境的配置. 詳細思路如下:

比如我們有一個My_Cfg參數, 在Prod和Dev取值有可能不同.

首先設置一個 Environment_Flag variable, 其取值是Prod或Dev.

然后, 定義為My_Cfg參數設定兩個變量, My_Cfg_For_Prod 和 My_Cfg_For_Dev, 并賦值, 分別對應Prod/Dev環境下My_Cfg的取值.  

在代碼中, 我們就可以通過Environment_Flag的取值, 就知道是該訪問 My_Cfg_For_Prod 變量還是 My_Cfg_For_Dev 變量, 進而得到My_Cfg的取值.

---------------

Regular的External trigger觸發dag的推薦用法

---------------

外部觸發需要trigger_dag命令行, 命令行最好要加上run_id參數;

同時DAG的schedule_interval參數最好設置成None, 表明這個DAG始終是由外部觸發

---------------

測試運行步驟:

---------------

1. 先測試pytnon代碼正確性

python ~/airflow/dags/tutorial.py

2. 通過命令行驗證DAG/task設置

# print the list of active DAGs

airflow list_dags

# prints the list of tasks the "tutorial" dag_id

airflow list_tasks tutorial

# prints the hierarchy of tasks in the tutorial DAG

airflow list_tasks tutorial --tree

3. 通過test命令行試跑一下, 測試一下code邏輯

airflow test tutorial my_task_id 2015-06-01   

4. 通過 backfill --mark_success=True   

airflow backfill tutorial -s 2015-06-01 -e 2015-06-07

來自: http://www.cnblogs.com/harrychinese/p/airflow.html

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!