본문 바로가기

데이터엔지니어링

[8주차] 데이터 파이프라인과 Airflow (2)

🙂 트랜잭션

✔ 트랜잭션이란?

Atomic하게 실행되어야 하는 SQL을 묶어서 하나의 작업처럼 처리하는 방법

  • BEGIN과 END 혹은 BEGIN과 COMMIT 사이에 해당 SQL을 사용
  • ROLLBACK은 BEGIN 이전의 상태로 돌아가는 SQL 명령


🙂 리눅스

ubuntu: 리눅스 타입 중 하나

ssh: 리눅스 혹은 유닉스 서버에 로그인하는 프로그램 (터미널)

sudo: 보통 슈퍼유저로서 프로그램을 구동할 수 있도록 하는 프로그램

apt-get: 우분투/데비안 계열의 리눅스에서 프로그램 설치/삭제를 관리해주는 프로그램

su: substitute user의 약자로 현재 사용 중인 사용자 계정을 로그아웃하지 않고 다른 사용자의 권한을 얻을 때 사용

vi: 텍스트 에디터

 


🙂 Docker 기반 Airflow 실행

  • 터미널 프로그램 실행 후 적당한 폴더 이동
  • airflow-setup Github repo 클론
    • git clone https://github.com/keeyong/airflow-setup.git
  • airflow-setup 폴더로 이동 후 2.5.1 이미지 관련 yml 파일 다운로드
    • cd airflow-setup
    • curl -Lf0 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'
  • 다음 2개의 명령을 수행 (이미지 다운로드와 컨테이너 실행)
    • docker-compose -f docker-compose.yaml pull
    • docker-compose -f docker-compose.yaml up
  • http://localhost:8080으로 웹 UI 로그인
    • airflow:airflow 사용

🙂 Airflow 기본 프로그램

✔ 기본구조

  • DAG 대표하는 객체를 먼저 만듦
    • DAG 이름, 실행주기, 실행날짜, 오너 등
  • DAG를 구성하는 Task를 만듦
    • Task별로 적합한 오퍼레이터를 선택
    • Task ID를 부여하고 해야할 작업의 세부사항 지정
  • 최종적으로 Task간의 실행 순서를 결정

✔ DAG 설정 예제

from datetime import datetime, timedelta

default_args = {
    'owner': '사용자이름',
    'email': ['메일주소'],
    'retries': 1,
    'retry_delay': timedelta(minutes=3),
}

 

지정되는 인자들은 모든 Task에 공통으로 적용되는 설정

뒤에서 DAG 객체를 만들 때 지정

 

from airflow import DAG

dag = DAG(
    "dag_v1", #DAG name
    start_date = datetime(2020,8,7,hour=0,minute=00),
    schedule="0 * * * *",
    tags=["example"],
    catchup=False,
    # common settings
    default_args=default_args
)

schedule 의미

 

✔ Bash Operator 예제

  • 3개의 Task
  • t1은 현재 시간
  • t2는 5초 대기 후 종료
  • t3은 서버의 /tmp 디렉토리 내용 출력
  • t1이 끝나고 t2와 t3 병렬 실행
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    ...
}

test_dag = DAG(
    "dag_v1", # DAG name
    schedule = "0 9 * * *",
    tags=["test"],
    catchUp=False,
    default_args=default_args
)

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=test_dag
)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    dag=test_dag
)

t3 = BashOperator(
    task_id='ls',
    bash_command='ls /tmp',
    dag=test_dag
)

t1 >> [t2, t3]