使用Apache Airflow进行工作流程创建 (2)

创建一个DAG

在安装了Airflow的目录下的“dags/”文件夹中创建文件。

$ touch dags/sample_dag.py

创建一个可以执行以下ETL处理的DAG。

    1. 将JSON字符串返回为字典变量

 

    1. 从字典变量中提取值并相加返回

 

    将相加的总值打印到标准输出
import json
import pendulum
from airflow.decorators import dag, task


@dag(
    schedule=None,
    start_date=pendulum.datetime(2023, 6, 25, tz="UTC"),
    catchup=False,
    tags=["ETL"],
)
def sample_dag():

    @task()
    def extract():

        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict

    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):

        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}

    @task()
    def load(total_order_value: float):

        print(f"Total order value is: {total_order_value:.2f}")

    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])


sample_dag()

image.png
image.png

提取日志

ac67b37461ee
*** Found local files:
***   * /opt/airflow/logs/dag_id=sample_dag/run_id=manual__2023-06-25T13:38:09.170374+00:00/task_id=extract/attempt=1.log
[2023-06-25, 22:38:10 JST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: sample_dag.extract manual__2023-06-25T13:38:09.170374+00:00 [queued]>
[2023-06-25, 22:38:10 JST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: sample_dag.extract manual__2023-06-25T13:38:09.170374+00:00 [queued]>
[2023-06-25, 22:38:10 JST] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-06-25, 22:38:10 JST] {taskinstance.py:1327} INFO - Executing <Task(_PythonDecoratedOperator): extract> on 2023-06-25 13:38:09.170374+00:00
[2023-06-25, 22:38:10 JST] {standard_task_runner.py:57} INFO - Started process 584 to run task
[2023-06-25, 22:38:10 JST] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'sample_dag', 'extract', 'manual__2023-06-25T13:38:09.170374+00:00', '--job-id', '419', '--raw', '--subdir', 'DAGS_FOLDER/sample_dag.py', '--cfg-path', '/tmp/tmp2ctqlv0m']
[2023-06-25, 22:38:10 JST] {standard_task_runner.py:85} INFO - Job 419: Subtask extract
[2023-06-25, 22:38:10 JST] {task_command.py:410} INFO - Running <TaskInstance: sample_dag.extract manual__2023-06-25T13:38:09.170374+00:00 [running]> on host ac67b37461ee
[2023-06-25, 22:38:10 JST] {taskinstance.py:1547} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='sample_dag' AIRFLOW_CTX_TASK_ID='extract' AIRFLOW_CTX_EXECUTION_DATE='2023-06-25T13:38:09.170374+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-25T13:38:09.170374+00:00'
[2023-06-25, 22:38:10 JST] {python.py:183} INFO - Done. Returned value was: {'1001': 301.27, '1002': 433.21, '1003': 502.22}
[2023-06-25, 22:38:10 JST] {taskinstance.py:1350} INFO - Marking task as SUCCESS. dag_id=sample_dag, task_id=extract, execution_date=20230625T133809, start_date=20230625T133810, end_date=20230625T133810
[2023-06-25, 22:38:10 JST] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-06-25, 22:38:10 JST] {taskinstance.py:2653} INFO - 1 downstream tasks scheduled from follow-on schedule check

转换日志

ac67b37461ee
*** Found local files:
***   * /opt/airflow/logs/dag_id=sample_dag/run_id=manual__2023-06-25T13:38:09.170374+00:00/task_id=transform/attempt=1.log
[2023-06-25, 22:38:11 JST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: sample_dag.transform manual__2023-06-25T13:38:09.170374+00:00 [queued]>
[2023-06-25, 22:38:11 JST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: sample_dag.transform manual__2023-06-25T13:38:09.170374+00:00 [queued]>
[2023-06-25, 22:38:11 JST] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-06-25, 22:38:11 JST] {taskinstance.py:1327} INFO - Executing <Task(_PythonDecoratedOperator): transform> on 2023-06-25 13:38:09.170374+00:00
[2023-06-25, 22:38:11 JST] {standard_task_runner.py:57} INFO - Started process 587 to run task
[2023-06-25, 22:38:11 JST] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'sample_dag', 'transform', 'manual__2023-06-25T13:38:09.170374+00:00', '--job-id', '420', '--raw', '--subdir', 'DAGS_FOLDER/sample_dag.py', '--cfg-path', '/tmp/tmp8cf_c2w4']
[2023-06-25, 22:38:11 JST] {standard_task_runner.py:85} INFO - Job 420: Subtask transform
[2023-06-25, 22:38:11 JST] {task_command.py:410} INFO - Running <TaskInstance: sample_dag.transform manual__2023-06-25T13:38:09.170374+00:00 [running]> on host ac67b37461ee
[2023-06-25, 22:38:11 JST] {taskinstance.py:1547} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='sample_dag' AIRFLOW_CTX_TASK_ID='transform' AIRFLOW_CTX_EXECUTION_DATE='2023-06-25T13:38:09.170374+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-25T13:38:09.170374+00:00'
[2023-06-25, 22:38:11 JST] {python.py:183} INFO - Done. Returned value was: {'total_order_value': 1236.7}
[2023-06-25, 22:38:11 JST] {taskinstance.py:1350} INFO - Marking task as SUCCESS. dag_id=sample_dag, task_id=transform, execution_date=20230625T133809, start_date=20230625T133811, end_date=20230625T133811
[2023-06-25, 22:38:11 JST] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-06-25, 22:38:11 JST] {taskinstance.py:2653} INFO - 1 downstream tasks scheduled from follow-on schedule check

加载日志

ac67b37461ee
*** Found local files:
***   * /opt/airflow/logs/dag_id=sample_dag/run_id=manual__2023-06-25T13:38:09.170374+00:00/task_id=load/attempt=1.log
[2023-06-25, 22:38:13 JST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: sample_dag.load manual__2023-06-25T13:38:09.170374+00:00 [queued]>
[2023-06-25, 22:38:13 JST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: sample_dag.load manual__2023-06-25T13:38:09.170374+00:00 [queued]>
[2023-06-25, 22:38:13 JST] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-06-25, 22:38:13 JST] {taskinstance.py:1327} INFO - Executing <Task(_PythonDecoratedOperator): load> on 2023-06-25 13:38:09.170374+00:00
[2023-06-25, 22:38:13 JST] {standard_task_runner.py:57} INFO - Started process 590 to run task
[2023-06-25, 22:38:13 JST] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'sample_dag', 'load', 'manual__2023-06-25T13:38:09.170374+00:00', '--job-id', '421', '--raw', '--subdir', 'DAGS_FOLDER/sample_dag.py', '--cfg-path', '/tmp/tmpfrt4zy13']
[2023-06-25, 22:38:13 JST] {standard_task_runner.py:85} INFO - Job 421: Subtask load
[2023-06-25, 22:38:13 JST] {task_command.py:410} INFO - Running <TaskInstance: sample_dag.load manual__2023-06-25T13:38:09.170374+00:00 [running]> on host ac67b37461ee
[2023-06-25, 22:38:13 JST] {taskinstance.py:1547} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='sample_dag' AIRFLOW_CTX_TASK_ID='load' AIRFLOW_CTX_EXECUTION_DATE='2023-06-25T13:38:09.170374+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-25T13:38:09.170374+00:00'
[2023-06-25, 22:38:13 JST] {logging_mixin.py:149} INFO - Total order value is: 1236.70
[2023-06-25, 22:38:13 JST] {python.py:183} INFO - Done. Returned value was: None
[2023-06-25, 22:38:13 JST] {taskinstance.py:1350} INFO - Marking task as SUCCESS. dag_id=sample_dag, task_id=load, execution_date=20230625T133809, start_date=20230625T133813, end_date=20230625T133813
[2023-06-25, 22:38:13 JST] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-06-25, 22:38:13 JST] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check

可以看到「订单总金额为:1236.70」已经输出到标准输出。

bannerAds