728x90
반응형
Pipeline 작성
Before Testing
# check ~/airflow/airflow.cfg
# dags default 경로
# dags_folder = /root/airflow/dags
nano ~/airflow/dags/sample.py
from datetime import timedelta, datetime
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'owner' : 'WMS',
'depends_on_past' : False,
'email' : ['jay_g@kr.accommate.com'],
'email_on_failure': True,
'email_on_retry' : False,
'retires' : 3,
'retry_delay' : timedelta(minutes = 2),
}
with DAG(
dag_id = 'SampleDAG',
default_args = default_args,
description = 'Sample DAG',
schedule_interval = timedelta(minutes = 5),
start_date = datetime(2021, 10, 22),
catchup = False,
tags = ['Sample'],
) as dag:
task1 = BashOperator(
task_id = 'bash_print_date',
bash_command = 'date'
)
task2 = BashOperator(
task_id = 'bash_sleep',
depends_on_past = False,
bash_command = 'sleep 5',
retries = 3
)
task1.doc_md = dedent(
"""
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
)
dag.doc_md = __doc__
dag.doc_md = "documentation placed"
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.foo }}"
{% endfor %}
"""
)
task3 = BashOperator(
task_id = 'bash_template',
depends_on_past = False,
bash_command = templated_command,
params = { 'foo': 'var' },
)
task1 >> [task2, task3]
Testing
python3 ~/airflow/dags/sample.py
# If the script does not raise an exception
# it means that you have not done anything horribly wrong,
# and that your Airflow environment is somewhat sound.
Airflow metadata validation
# initialize the database tables
airflow db init
# DB: sqlite:////root/airflow/airflow.db
# [2021-10-25 08:18:50,515] {db.py:823} INFO - Creating tables
# INFO [alembic.runtime.migration] Context impl SQLiteImpl.
# INFO [alembic.runtime.migration] Will assume non-transactional DDL.
# WARNI [airflow.models.crypto] empty cryptography key - values will not be stored encrypted.
# Initialization done
# print the list of active DAGs
airflow dags list
# dag_id | filepath | owner | paused
# ========================================+===============================================================+=========+=======
# SampleDAG | sample.py | WMS | True
# prints the list of tasks in the "tutorial" DAG
airflow tasks list SampleDAG
# prints the hierarchy of tasks in the "tutorial" DAG
airflow tasks list SampleDAG --tree
# airflow tasks test [dag_id] [task_id] [logical_date]
airflow tasks test SampleDAG bash_print_date 2015-06-01
airflow tasks test SampleDAG bash_sleep 2015-06-01
airflow tasks test SampleDAG bash_template 2015-06-01
# airflow dags test [dag_id] [logical_date]
# While it does take task dependencies into account, no state is registered in the database.
# It is convenient for locally testing a full run of your DAG, given that e.g.
# if one of your tasks expects data at some location, it is available
airflow dags test SampleDAG 2015-06-01
Backfill
airflow dags backfill tutorial --start-date 2021-10-24
# optionally --end-date 2021-10-31
사용
# /home/airflow docker-compose 경로 이다.
nano /home/airflow/dags/sample.py
# http://[Airflow IP]:8080/ 접속
728x90
반응형
'Infrastructure > Airflow' 카테고리의 다른 글
[Airflow] Ubuntu 20.04 docker-compose 설치 (0) | 2021.10.26 |
---|