在本地环境中尝试Apache Airflow的教程

Apache Airflow 2.2.3 是一个开源的工作流管理平台。

undefined

Airflow是什么?

Apache Airflow是由Airbnb的Maxime Beauchemin开始开发的基于Python的作业管理工具,用于通过编程创建/调度/监视工作流的平台。
在Airflow中,作业的执行顺序和依赖关系的工作流被定义为有向无环图(DAG)。

建筑设计

undefined

空气流动由以下组件组成。

    • スケジュールされたワークフローのトリガーと、実行タスクをExecuterの送信するためのScheduler。

 

    • 実行中のタスクを処理するExecuter。デフォルトではスケジューラ内ですべてを実行するが、本番環境に適したExecuterはタスクの実行をWorkersにプッシュする。

 

    • DAGおよびタスクの動作を検査、トリガー、およびデバッグするための便利なユーザーインターフェイスを提供するWebサーバー。

 

    • Scheduler/ Executorに読み取られるDAGファイルのフォルダ。

 

    状態保持のために Scheduler/ Executor/ Webサーバー に使用されるMeta DB。

准备执行

设置Airflow的路径。

$ export AIRFLOW_HOME=~/airflow

Airflow安装命令。

$ pip install "apache-airflow[celery]==2.2.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.3/constraints-3.6.txt"

在这里,我们建议使用 PostgreSQL 来准备数据库。

CREATE DATABASE airflow_db;
CREATE USER airflow_user WITH PASSWORD 'airflow_pass';
GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;

据说在Airflow中,初始数据库设置为SQLite,因此需要编辑airflow/airflow.cfg文件。

将sql_alchemy_conn的值更改为sqlite:////home/user/airflow/airflow.db的路径,更改为postgresql://airflow_user:airflow_pass@localhost/airflow_db。

执行数据库的初始化和用户创建。

$ airflow standalone

准备代码

创建一个名为airflow/dags的文件夹,在其中创建test-tutorial.py,并复制教程页面的tutorial.py内容以创建DAG。

from datetime import datetime, timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
with DAG(
    'test-tutorial',
    default_args=default_args,
    description='A simple test-tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        retries=3,
    )
    t1.doc_md = dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)

    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id='templated',
        depends_on_past=False,
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
    )

    t1 >> [t2, t3]

解读代码

パラメータ型説明dag_idstrDAG単位で与える一意な識別子。ここでは’test-tutorial’が該当。default_argsdictオペレータに渡す引数。引数の中には優先順位があり、優先順から
1. 明示した引数
2. default_args
3. オペレータのデフォルト値(存在する場合)
となる。

关于执行者

修改DAG的时候发现无法加载,但通过将airflow/airflow.cfg内的executor从SequentialExecutor更改为CeleryExecutor,成功加载了(可能是因为SequentialExecutor只能顺序执行任务?)。

关于执行者(executer)的总结如下。

SequentialExecuterCeleryExecutorLocalExecutorデフォルトのExecutorで、直列にしかタスクを実行できない。お試し版的存在キューを利用したExecutorでスケジューラとworkerが分離される。schedulerとworkerを同じノードで構成する場合に使用するExecutorスケールは一切しないworkerを並列実行でき、スケールアップおよびスケールアウト でスケール可能になるため本番環境向き。マルチプロセスで動作し、スケールアップ によってスケール可能

执行

使用命令

$ airflow webserver

当你执行时,会在localhost:8080上打开以下的页面

image.png

如果无法使用admin/admin登录,则创建一个用于登录的用户并登录。

$ airflow users  create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin
image.png
image.png
image.png
image.png

文化语言

任务 wù)

在生成操作符对象的实例化时,任务会被创建。由操作符实例化的对象被称为任务。在源代码中可以表示为如下。

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
)

第一个参数task_id作为任务的唯一标识符起作用。

参考

    • 2018 builderscon airflowを用いて、 複雑大規模なジョブフロー管理 に立ち向かう

 

    • Apache Airflowの並列性と並行性について理解する

 

    Airflow を用いたデータフロー分散処理
bannerAds