[1] dags 폴더에 예제 파일을 작성한다. 파일 제목은 print_data.py로 한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime,timedelta

dag = DAG('hello-airflow',description='Hello airflow DAG',
          schedule_interval = '*/5 0 * * *',
          start_date=datetime(2021,7,1),catchup=False)

def print_hello():
    return 'Hello Airflow'

python_task = PythonOperator(
                    task_id='python_operator',
                    python_callable = print_hello,
                    dag = dag)

bash_task = BashOperator(
        task_id='print_date',
        bash_command='date',
        dag=dag)

bash_task.set_downstream(python_task)

[2] Airflow DB를 초기화한다.

1
airflow db init

[3] DB를 초기화 하고 나면 dag list 명령어로 dags 폴더에 넣어두었던 DAG가 DAG 리스트에 등록된 것을 확인할 수 있다.

1
airflow dags list

[3] 아래 명령어로 특정 DAG 안에 포함된 Task들도 확인할 수 있다.

1
airflow tasks list hello-airflow

[4] 테스트는 airflow tasks test {DAG ID} {TASK_ID} {기준 날짜} 명령어를 사용하여 Task 별로 시행할 수 있다.

1
airflow tasks test hello-airflow print_date 2021/7/1

[5] DAG 등록과 테스트가 완료되었다면 airflow scheduler를 실행시키자. 그러면 등록된 DAG가 자동으로 실행된다.

1
airflow scheduler

[6] 웹 UI에 접속하면 아래와 같이 등록된 DAG와 그 상태를 확인할 수 있다.