🙂 Hello World 예제
✔ Operators - PythonOperator
from airflow.operators.python import PythonOperator
load_nps = PythonOperator(
dag=dag,
task_id="task_id",
python_callable=python_func,
params={
'table': 'delighted_nps',
'schema': 'raw_data'
},
)
def python_func(**cxt):
table = cxt['params']['table']
schema = cxt['params']['schema']
ex_date = cxt['execution_date']
...
🍦 2개의 태스크로 구성된 데이터 파이프라인 (DAG)
- print_hello: PythonOperator. 먼저 실행
- print_goodbye: PythonOpertor. 두번째로 실행
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
dag_id='HelloWorld',
start_date=datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule='0 2 * * *')
def print_hello():
print("hello!")
return "hello!"
def print_goodbye():
print("goodbye!")
return "goodbye!"
print_hello = PythonOperator(
task_id = 'print_hello',
#python_callable param points to the function you want to run
python_callable = print_hello,
#dag param points to the DAG that this task is a part of
dag = dag)
print_goodbye = PythonOperator(
task_id = 'print_goodbye',
python_callable = print_goodbye,
dag = dag)
#Assign the order of the tasks in our DAG
print_hello >> print_goodbye
Airflow Decorators: 프로그래밍이 단순해짐
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task
from datetime import datetime
dag = DAG(
dag_id='HelloWorld',
start_date=datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule='0 2 * * *')
@task
def print_hello():
print("hello!")
return "hello!"
@task
def print_goodbye():
print("goodbye!")
return "goodbye!"
# Assign the order of the tasks in our DAG
# 함수 이름이 task ID
print_hello() >> print_goodbye()
DAG 파라미터
- max_active_runs: # of DAGs instance
- max_active_tasks: # of tasks that can run in parallel
- catchup: whether to backfill past runs
- DAG parameters vs. Task parameters: DAG 객체를 만들 때 지정
🙂 Name Gender 예제 프로그램 포팅
Colab Python 코드를 Airflow로 포팅
Connections
- 호스트이름, 포트번호, 접근 자격증명 등
- 환경 변수 형태로 코드 밖으로 빼내 주는 역할
Variables
- API키나 여러 구성 정보를 저장
✔ Xcom
- Task(Operator) 같에 데이터를 주고 받기 위한 방식
- 보통 한 Operator의 리턴 값을 다른 Operator에서 읽어가는 형태가 됨
- 이 값들은 Airflow 메타 데이터 DB에 저장이 되기에 큰 데이터를 주고받는데는 사용불가
- 보통 큰 데이터는 S3에 로드하고 그 위치를 넘기는 것이 일반적
🙂 airflow.cfg
windows docker로 실행 시
- airflow-setup-airflow-webserver-1 > Files 탭
- /opt/airflow/airflow.cfg 경로에 위치

🍦 DAGs 폴더는 어디에 지정되는가?

/opt/airflow/dags에 지정된다. 보통 코드 레파지토리의 하위폴더 경로로 설정한다고 한다.
🍦 DAGs 폴더에 새로운 Dag를 만들면 언제 실제로 Airflow 시스템에서 이를 알게 되나?
이 스캔 주기를 결정해주는 키의 이름은 무엇인가?

Default 5분 (300초)
🍦 이 파일에서 Airflow를 API 형태로 외부에서 조작하고 싶다면 어느 섹션을 변경해야하는가?

auth_backends: API 사용자 인증에 사용할 인증 백엔드를 지정.
기본값인 session에서는 외부에서 API 형태로 제어가 불가능.
airflow.api.auth.backend.basic_auth 로 변경하면 API 형태로 외부 조작이 가능.
🍦 Variable에서 변수의 값이 encrypted가 되려면 변수의 이름에 어떤 단어들이 들어가야 하는데
이 단어들은 무엇일까?
secret, password, passwd, authorization, api_key, apikey, access_token
🍦 이 환경 설정 파일이 수정되었다면 이를 실제로 반영하기 위해서 해야 하는 일은?
sudo systemctl restart airflow-webserver
sudo systemctl restart airflow-scheduler
Docker의 경우 Docker 컨테이너를 재실행.
🍦 Metadata DB의 내용을 암호화하는데 사용되는 키는 무엇인가?

fernet_key (대칭키)를 사용하여 Metadata DB의 내용 암호화 가능.
'데이터엔지니어링' 카테고리의 다른 글
[9주차] Docker & K8s (1) (0) | 2024.05.27 |
---|---|
[8주차] 데이터 파이프라인과 Airflow (4) (0) | 2024.05.27 |
[8주차] 데이터 파이프라인과 Airflow (2) (0) | 2024.05.21 |
[8주차] 데이터 파이프라인과 Airflow (1) (0) | 2024.05.20 |
[7주차] 데이터 웨어하우스 관리와 고급 SQL과 BI 대시보드 (5) (0) | 2024.05.10 |