본문 바로가기

데이터엔지니어링

[8주차] 데이터 파이프라인과 Airflow (3)

🙂 Hello World 예제

✔ Operators - PythonOperator

from airflow.operators.python import PythonOperator

load_nps = PythonOperator(
    dag=dag,
    task_id="task_id",
    python_callable=python_func,
    params={
        'table': 'delighted_nps',
        'schema': 'raw_data'
    },
)

def python_func(**cxt):
    table = cxt['params']['table']
    schema = cxt['params']['schema']
    
    ex_date = cxt['execution_date']
    
    ...

 

🍦 2개의 태스크로 구성된 데이터 파이프라인 (DAG)

  • print_hello: PythonOperator. 먼저 실행
  • print_goodbye: PythonOpertor. 두번째로 실행
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

dag = DAG(
    dag_id='HelloWorld',
    start_date=datetime(2022,5,5),
    catchup=False,
    tags=['example'],
    schedule='0 2 * * *')

def print_hello():
    print("hello!")
    return "hello!"

def print_goodbye():
    print("goodbye!")
    return "goodbye!"

print_hello = PythonOperator(
    task_id = 'print_hello',
    #python_callable param points to the function you want to run 
    python_callable = print_hello,
    #dag param points to the DAG that this task is a part of
    dag = dag)

print_goodbye = PythonOperator(
    task_id = 'print_goodbye',
    python_callable = print_goodbye,
    dag = dag)

#Assign the order of the tasks in our DAG
print_hello >> print_goodbye

 

Airflow Decorators: 프로그래밍이 단순해짐

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task
from datetime import datetime

dag = DAG(
    dag_id='HelloWorld',
    start_date=datetime(2022,5,5),
    catchup=False,
    tags=['example'],
    schedule='0 2 * * *')

@task
def print_hello():
    print("hello!")
    return "hello!"

@task
def print_goodbye():
    print("goodbye!")
    return "goodbye!"

# Assign the order of the tasks in our DAG
# 함수 이름이 task ID
print_hello() >> print_goodbye()

 

DAG 파라미터

  • max_active_runs: # of DAGs instance
  • max_active_tasks: # of tasks that can run in parallel
  • catchup: whether to backfill past runs
  • DAG parameters vs. Task parameters: DAG 객체를 만들 때 지정

🙂 Name Gender 예제 프로그램 포팅

Colab Python 코드를 Airflow로 포팅

 

Connections

  • 호스트이름, 포트번호, 접근 자격증명 등
  • 환경 변수 형태로 코드 밖으로 빼내 주는 역할

Variables

  • API키나 여러 구성 정보를 저장

✔ Xcom

  • Task(Operator) 같에 데이터를 주고 받기 위한 방식
  • 보통 한 Operator의 리턴 값을 다른 Operator에서 읽어가는 형태가 됨
  • 이 값들은 Airflow 메타 데이터 DB에 저장이 되기에 큰 데이터를 주고받는데는 사용불가
    • 보통 큰 데이터는 S3에 로드하고 그 위치를 넘기는 것이 일반적

 


🙂 airflow.cfg

windows docker로 실행 시

  • airflow-setup-airflow-webserver-1 > Files 탭
  • /opt/airflow/airflow.cfg 경로에 위치

 

🍦 DAGs 폴더는 어디에 지정되는가?

/opt/airflow/dags에 지정된다. 보통 코드 레파지토리의 하위폴더 경로로 설정한다고 한다.

 

🍦 DAGs 폴더에 새로운 Dag를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되나?

     이 스캔 주기를 결정해주는 키의 이름은 무엇인가?

Default 5분 (300초)

 

🍦 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어느 섹션을 변경해야하는가?

 

auth_backends: API 사용자 인증에 사용할 인증 백엔드를 지정.

기본값인 session에서는 외부에서 API 형태로 제어가 불가능.

airflow.api.auth.backend.basic_auth 로 변경하면 API 형태로 외부 조작이 가능.

 

🍦 Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야 하는데

     이 단어들은 무엇일까?

 

secret, password, passwd, authorization, api_key, apikey, access_token

 

🍦 이 환경 설정 파일이 수정되었다면 이를 실제로 반영하기 위해서 해야 하는 일은?

sudo systemctl restart airflow-webserver
sudo systemctl restart airflow-scheduler

Docker의 경우 Docker 컨테이너를 재실행.

 

🍦 Metadata DB의 내용을 암호화하는데 사용되는 키는 무엇인가?

fernet_key (대칭키)를 사용하여 Metadata DB의 내용 암호화 가능.