본문 바로가기

데이터엔지니어링

[10주차] Airflow 고급 기능, DBT와 Data Catalog (3)

🙂 Dag Dependencies

✔ Dag 실행 방법

  • 주기적 실행: schedule로 지정
  • 다른 Dag에 의해 트리거
    • Explicit Trigger: Dag A가 분명하게 Dag B를 트리거 (TriggerDagOperator)
    • Reactive Trigger: Dag B가 Dag A가 끝나기를 대기 (ExternalTaskSensor)
      Dag A는 이 사실을 모름
  • 알아두면 좋은 상황에 따라 다른 Task 실행 방식
    • 조건에 따라 다른 Task로 분기 (BranchPythonOperator)
    • 과거 데이터 Backfill 시에는 불필요한 Task 처리 (LatestOnlyOperator)
    • 앞단 Task의 실행상황
      • 어떤 경우에는 앞단이 실패해도 동작해야하는 경우가 있을 수 있음

🍦 Jinja Template

  • Jinja 템플릿은 Python에서 널리 사용되는 템플릿 엔진
    • Django 템플릿 엔진에서 영감을 받아 개발
    • Jinja를 사용하면 프레젠테이션 로직과 애플리케이션 로직을 분리하여 동적으로 HTML 생성
    • Flask에서 사용됨
  • 변수는 이중 중괄호 {{}}로 감싸서 사용
    • <h1>안녕하세요, {{ name }}님!</h1>
  • 제어문은 퍼센트 기호 {% %}로 표시
    • {% for item in items %}
      <li>{{ item }}</li>
      {% endfor %}

Airflow에서 사용 가능한 Jinja 변수

  • {{ ds }}
  • {{ ds_nodash }}
  • {{ ts }}
  • {{ dag }}
  • {{ task }}
  • {{ dag_run }}
  • {{ var.value }}: {{ var.value.get('my.var', 'fallback') }}
  • {{ var.json }}: {{ var.json.my_dict_var.key1 }}
  • {{ conn }}: {{ conn.my_conn_id.login }}, {{ conn.my_conn_id.password }}

🍦 Sensor

  • Sensor는 특정 조건이 충족될 때까지 대기하는 Operator
  • 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용
  • Airflow는 몇 가지 내장 Sensor를 제공
    • FileSensor: 지정된 위치에 파일이 생길 때까지 대기
    • HttpSensor: HTTP 요청을 수행하고 지정된 응답 대기
    • SqlSensor: SQL 데이터베이스에서 특정 조건을 충족할 때까지 대기
    • TimeSensor: 특정 시간에 도달할 때까지 Workflow를 일시 중지
    • ExternalTaskSensor: 다른 DAG의 특정 작업 완료를 대기
      • Dag B의 ExternalTaskSensor Task가 Dag A의 특정 Task가 끝났는지 체크
        • 동일한 schedule_interval 사용 (동일한 frequency)
        • 이 경우 두 태스크의 Execution Date가 동일해야함
  • 주기적으로 poke
    • worker를 하나 붙잡고 poke 간에 sleep을 할지 아니면 worker를 릴리스하고 다시 잡아서 poke를 할지 결정해주는 파라미터가 존재: mode
      • mode의 값은 reschedule 혹은 poke가 됨

🍦 Trigger Rules

  • Upstream Task의 성공실패 상황에 따라 뒷단 Task의 실행여부를 결정
    • 보통 앞단이 하나라도 실패하면 뒷단의 Task는 실행 불가
  • Operator의 trigger_rule 파라미터로 결정 가능
    • trigger_rule은 task에 주어지는 파라미터로 다음과 같은 값이 가능
    • all_success (default), all_failed, all_done, one_failed, one_success, none_failed, none_failed_min_one_success
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta

default_args = {
    'start_date': datetime(2023, 6, 15)
}

with DAG("Learn_TriggerRule", default_args=default_args, schedule=timedelta(1)) as dag:
   t1 = BashOperator(task_id="print_date", bash_command="date")
   t2 = BashOperator(task_id="sleep", bash_command="sleep 5")
   t3 = BashOperator(task_id="exit", bash_command="exit 1")
   t4 = BashOperator(
       task_id='final_task',
       bash_command='echo DONE!',
       trigger_rule=TriggerRule.ALL_DONE
   )
   [t1, t2, t3] >> t4

🙂 Task Grouping

✔ 필요성

  • Task 수가 많은 DAG라면 Task를 성격에 따라 관리하고 싶은 니즈 존재
    • SubDAG이 사용되다가 Airflow 2.0에서 나온 Task Grouping으로 넘어가는 추세
      • SubDAG를 비슷한 일을 하는 Task를 SubDAG라는 Child Dag로 만들어서 관리
  • 다수의 파일 처리를 하는 DAG라면
    • 파일 다운로드 Task / 파일 체크 Task / 데이터 처리 Task 로 구성

 

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
import pendulum

with DAG(dag_id="Learn_Task_Group", start_date=pendulum.today('UTC').add(days=-2), tags=["example"]) as dag:
    start = EmptyOperator(task_id="start")

    # Task Group #1
    with TaskGroup("Download", tooltip="Tasks for downloading data") as section_1:
        task_1 = EmptyOperator(task_id="task_1")
        task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
        task_3 = EmptyOperator(task_id="task_3")

        task_1 >> [task_2, task_3]

    # Task Group #2
    with TaskGroup("Process", tooltip="Tasks for processing data") as section_2:
        task_1 = EmptyOperator(task_id="task_1")

        with TaskGroup("inner_section_2", tooltip="Tasks for inner_section2") as inner_section_2:
            task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
            task_3 = EmptyOperator(task_id="task_3")
            task_4 = EmptyOperator(task_id="task_4")

            [task_2, task_3] >> task_4

    end = EmptyOperator(task_id='end')

    start >> section_1 >> section_2 >> end

 

코드 그래프 표현


🙂 Dynamic Dags

  • 템플릿과 YAML을 기반으로 DAG를 동적으로 만듦
    • Jinja를 기반으로 DAG 자체의 템플릿을 디자인하고 YAML을 통해 앞서 만든 템플릿에 파라미터를 제공
  • 이를 통해 비슷한 DAG를 계속해서 매뉴얼하게 개발하는 것을 방지
  • DAG를 계속해서 만드는 것과 한 DAG안에서 Task를 늘리는 것 사이의 밸런스 필요
    • 오너가 다르거나 Task의 수가 너무 커지는 경우 DAG를 복제해나가는 것이 더 좋음

'데이터엔지니어링' 카테고리의 다른 글