🙂 Dag Dependencies
✔ Dag 실행 방법
- 주기적 실행: schedule로 지정
- 다른 Dag에 의해 트리거
- Explicit Trigger: Dag A가 분명하게 Dag B를 트리거 (TriggerDagOperator)
- Reactive Trigger: Dag B가 Dag A가 끝나기를 대기 (ExternalTaskSensor)
Dag A는 이 사실을 모름
- Explicit Trigger: Dag A가 분명하게 Dag B를 트리거 (TriggerDagOperator)
- 알아두면 좋은 상황에 따라 다른 Task 실행 방식
- 조건에 따라 다른 Task로 분기 (BranchPythonOperator)
- 과거 데이터 Backfill 시에는 불필요한 Task 처리 (LatestOnlyOperator)
- 앞단 Task의 실행상황
- 어떤 경우에는 앞단이 실패해도 동작해야하는 경우가 있을 수 있음
🍦 Jinja Template
- Jinja 템플릿은 Python에서 널리 사용되는 템플릿 엔진
- Django 템플릿 엔진에서 영감을 받아 개발
- Jinja를 사용하면 프레젠테이션 로직과 애플리케이션 로직을 분리하여 동적으로 HTML 생성
- Flask에서 사용됨
- 변수는 이중 중괄호 {{}}로 감싸서 사용
- <h1>안녕하세요, {{ name }}님!</h1>
- 제어문은 퍼센트 기호 {% %}로 표시
- {% for item in items %}
<li>{{ item }}</li>
{% endfor %}
- {% for item in items %}
Airflow에서 사용 가능한 Jinja 변수
- {{ ds }}
- {{ ds_nodash }}
- {{ ts }}
- {{ dag }}
- {{ task }}
- {{ dag_run }}
- {{ var.value }}: {{ var.value.get('my.var', 'fallback') }}
- {{ var.json }}: {{ var.json.my_dict_var.key1 }}
- {{ conn }}: {{ conn.my_conn_id.login }}, {{ conn.my_conn_id.password }}
🍦 Sensor
- Sensor는 특정 조건이 충족될 때까지 대기하는 Operator
- 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용
- Airflow는 몇 가지 내장 Sensor를 제공
- FileSensor: 지정된 위치에 파일이 생길 때까지 대기
- HttpSensor: HTTP 요청을 수행하고 지정된 응답 대기
- SqlSensor: SQL 데이터베이스에서 특정 조건을 충족할 때까지 대기
- TimeSensor: 특정 시간에 도달할 때까지 Workflow를 일시 중지
- ExternalTaskSensor: 다른 DAG의 특정 작업 완료를 대기
- Dag B의 ExternalTaskSensor Task가 Dag A의 특정 Task가 끝났는지 체크
- 동일한 schedule_interval 사용 (동일한 frequency)
- 이 경우 두 태스크의 Execution Date가 동일해야함
- Dag B의 ExternalTaskSensor Task가 Dag A의 특정 Task가 끝났는지 체크
- 주기적으로 poke
- worker를 하나 붙잡고 poke 간에 sleep을 할지 아니면 worker를 릴리스하고 다시 잡아서 poke를 할지 결정해주는 파라미터가 존재: mode
- mode의 값은 reschedule 혹은 poke가 됨
- worker를 하나 붙잡고 poke 간에 sleep을 할지 아니면 worker를 릴리스하고 다시 잡아서 poke를 할지 결정해주는 파라미터가 존재: mode
🍦 Trigger Rules
- Upstream Task의 성공실패 상황에 따라 뒷단 Task의 실행여부를 결정
- 보통 앞단이 하나라도 실패하면 뒷단의 Task는 실행 불가
- Operator의 trigger_rule 파라미터로 결정 가능
- trigger_rule은 task에 주어지는 파라미터로 다음과 같은 값이 가능
- all_success (default), all_failed, all_done, one_failed, one_success, none_failed, none_failed_min_one_success
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta
default_args = {
'start_date': datetime(2023, 6, 15)
}
with DAG("Learn_TriggerRule", default_args=default_args, schedule=timedelta(1)) as dag:
t1 = BashOperator(task_id="print_date", bash_command="date")
t2 = BashOperator(task_id="sleep", bash_command="sleep 5")
t3 = BashOperator(task_id="exit", bash_command="exit 1")
t4 = BashOperator(
task_id='final_task',
bash_command='echo DONE!',
trigger_rule=TriggerRule.ALL_DONE
)
[t1, t2, t3] >> t4
🙂 Task Grouping
✔ 필요성
- Task 수가 많은 DAG라면 Task를 성격에 따라 관리하고 싶은 니즈 존재
- SubDAG이 사용되다가 Airflow 2.0에서 나온 Task Grouping으로 넘어가는 추세
- SubDAG를 비슷한 일을 하는 Task를 SubDAG라는 Child Dag로 만들어서 관리
- SubDAG이 사용되다가 Airflow 2.0에서 나온 Task Grouping으로 넘어가는 추세
- 다수의 파일 처리를 하는 DAG라면
- 파일 다운로드 Task / 파일 체크 Task / 데이터 처리 Task 로 구성

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
import pendulum
with DAG(dag_id="Learn_Task_Group", start_date=pendulum.today('UTC').add(days=-2), tags=["example"]) as dag:
start = EmptyOperator(task_id="start")
# Task Group #1
with TaskGroup("Download", tooltip="Tasks for downloading data") as section_1:
task_1 = EmptyOperator(task_id="task_1")
task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
task_3 = EmptyOperator(task_id="task_3")
task_1 >> [task_2, task_3]
# Task Group #2
with TaskGroup("Process", tooltip="Tasks for processing data") as section_2:
task_1 = EmptyOperator(task_id="task_1")
with TaskGroup("inner_section_2", tooltip="Tasks for inner_section2") as inner_section_2:
task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
task_3 = EmptyOperator(task_id="task_3")
task_4 = EmptyOperator(task_id="task_4")
[task_2, task_3] >> task_4
end = EmptyOperator(task_id='end')
start >> section_1 >> section_2 >> end

🙂 Dynamic Dags
- 템플릿과 YAML을 기반으로 DAG를 동적으로 만듦
- Jinja를 기반으로 DAG 자체의 템플릿을 디자인하고 YAML을 통해 앞서 만든 템플릿에 파라미터를 제공
- 이를 통해 비슷한 DAG를 계속해서 매뉴얼하게 개발하는 것을 방지
- DAG를 계속해서 만드는 것과 한 DAG안에서 Task를 늘리는 것 사이의 밸런스 필요
- 오너가 다르거나 Task의 수가 너무 커지는 경우 DAG를 복제해나가는 것이 더 좋음
'데이터엔지니어링' 카테고리의 다른 글
[11주차] 하둡과 Spark (1) (0) | 2024.06.17 |
---|---|
[10주차] Airflow 고급 기능, DBT와 Data Catalog (4) (1) | 2024.06.07 |
[9주차] Docker & K8s (5) (0) | 2024.05.31 |
[9주차] Docker & K8s (4) (0) | 2024.05.30 |
[9주차] Docker & K8s (3) (0) | 2024.05.29 |