Infrastructure/Airflow

[Airflow] DAGs 생성하기

상쾌한기분 2021. 11. 4. 23:13
반응형

Pipeline 작성

Before Testing

# check ~/airflow/airflow.cfg

# dags default 경로 
# dags_folder = /root/airflow/dags
nano ~/airflow/dags/sample.py
from datetime import timedelta, datetime
from textwrap import dedent

from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner'           : 'WMS',
    'depends_on_past' : False,
    'email'           : ['jay_g@kr.accommate.com'],
    'email_on_failure': True,
    'email_on_retry'  : False,
    'retires'         : 3,
    'retry_delay'     : timedelta(minutes = 2),

}

with DAG(
    dag_id = 'SampleDAG',
    default_args = default_args,
    description = 'Sample DAG',
    schedule_interval = timedelta(minutes = 5),
    start_date = datetime(2021, 10, 22),
    catchup = False,
    tags = ['Sample'],
) as dag:
    task1 = BashOperator(
        task_id = 'bash_print_date',
        bash_command = 'date'
    )
    task2 = BashOperator(
        task_id = 'bash_sleep',
        depends_on_past = False,
        bash_command = 'sleep 5',
        retries = 3
    )

    task1.doc_md = dedent(
        """
        #### Task Documentation
        You can document your task using the attributes `doc_md` (markdown),
        `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
        rendered in the UI's Task Instance Details page.
        ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
        """
    )
    dag.doc_md = __doc__
    dag.doc_md = "documentation placed"
    templated_command = dedent(
        """
        {% for i in range(5) %}
            echo "{{ ds }}"
            echo "{{ macros.ds_add(ds, 7)}}"
            echo "{{ params.foo }}"
        {% endfor %}
        """
    )

    task3 = BashOperator(
        task_id = 'bash_template',
        depends_on_past = False,
        bash_command = templated_command,
        params = { 'foo': 'var' },
    )

    task1 >> [task2, task3]

Testing

python3 ~/airflow/dags/sample.py
# If the script does not raise an exception 
# it means that you have not done anything horribly wrong, 
# and that your Airflow environment is somewhat sound.

Airflow metadata validation

# initialize the database tables
airflow db init

# DB: sqlite:////root/airflow/airflow.db
# [2021-10-25 08:18:50,515] {db.py:823} INFO - Creating tables
# INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
# INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
# WARNI [airflow.models.crypto] empty cryptography key - values will not be stored encrypted.
# Initialization done

# print the list of active DAGs
airflow dags list

# dag_id                                  | filepath                                                                                               | owner   | paused
# ========================================+===============================================================+=========+=======
# SampleDAG                               | sample.py                                                     | WMS     | True 

# prints the list of tasks in the "tutorial" DAG
airflow tasks list SampleDAG

# prints the hierarchy of tasks in the "tutorial" DAG
airflow tasks list SampleDAG --tree

# airflow tasks  test [dag_id] [task_id] [logical_date]
airflow tasks test SampleDAG bash_print_date 2015-06-01
airflow tasks test SampleDAG bash_sleep 2015-06-01
airflow tasks test SampleDAG bash_template 2015-06-01

# airflow dags test [dag_id] [logical_date]
# While it does take task dependencies into account, no state is registered in the database. 
# It is convenient for locally testing a full run of your DAG, given that e.g. 
# if one of your tasks expects data at some location, it is available
airflow dags test SampleDAG 2015-06-01

Backfill

airflow dags backfill tutorial --start-date 2021-10-24
# optionally --end-date 2021-10-31

사용

# /home/airflow docker-compose 경로 이다.
nano /home/airflow/dags/sample.py

# http://[Airflow IP]:8080/ 접속 
728x90
반응형