본문 바로가기
CS/DataEngineering

Apache Airflow 기반의 데이터 파이프라인

by Diligejy 2024. 1. 9.

 

p.xvii

파이프라인이 동적으로 바뀌거나 실시간 스트림 처리 환경에선 적합하지 않다고 하니 참고해 주세요.

 

p.9

잘 알려진 몇 가지 워크플로 관리자와 이들의 주요 특성에 대한 개요

 

이름 시작회사 워크플로 정의 개발 언어 스케쥴 관리 백필 사용자 인터페이스 플랫폼 설치 수평 확장
Airflow Arirbnb 파이썬 파이썬 Anywhere
Argo Applatix YAML Go 3rd party   Kubernetes
Azkaban LinkedIn YAML Java 아니오 Anywhere  
Conductor Netflix JSON Java 아니오   Anywhere
Luigi Spotify 파이썬 파이썬 아니오 Anywhere
Make   Custom DSL C 아니오 아니오 아니오 Anywhere 아니오
Metaflow Netflix 파이썬 파이썬 아니오   아니오 Anywhere
Nifi NSA UI Java 아니오 Anywhere
Oozie   XML Java Hadoop

 

p.27

태스크와 오퍼레이터의 차이점이 무엇일까요? 물론, 둘 다 코드로 실행합니다. Airflow에서 Operator는 단일 작업 수행 역할을 합니다. 몇몇 오퍼레이터는 BashOperator 및 PythonOperator와 같이 일반적인 작업을 수행하며, EmailOperator 또는 SimpleHTTPOperator와 같이 좀 더 특수한 목적을 위해 사용됩니다. 모두 단일 작업을 수행합니다.

 

DAG는 오퍼레이터 집합에 대한 실행을 오케스트레이션하는 역할을 합니다. 여기에는 오퍼레이터의 시작과 정지, 오퍼레이터가 완료되면 연속된 다음 태스크의 시작, 그리고 오퍼레이터 간의 의존성 보장이 포함됩니다.

 

이 책과 Airflow 문서 전반에 걸쳐 Operator와 Task라는 용어를 같은 의미로 사용하고 있습니다. 사용자 관점에서 모두 같은 의미이며, 종종 두 용어를 혼용해서 사용합니다. Operator는 단일 작업을 수행할 수 있는 기능을 제공합니다. Airflow는 BaseOperator와 BaseOperator로부터 상속된 PythonOperator, EmailOperator, OracleOperator와 같이 다양한 서브 클래스를 제공합니다.

 

하지만 두 가지 용어에 차이점이 있습니다. Airflow에서 Task는 작업의 올바른 실행을 보장하기 위한 오퍼레이터의 'wrapper' 또는 'manager'로 생각해볼 수 있습니다. 사용자는 오퍼레이터를 활용해 수행할 작업에 집중할 수 있으며, Airflow는 태스크를 통해 작업을 올바르게 실행할 수 있습니다. 

 

p.28~29

Airflow의 PythonOperator는 파이썬 코드 실행을 담당합니다. 앞에서 사용된 BashOperator와 마찬가지로 모든 오퍼레이터에는 task_id가 필요합니다. task_id는 태스크 실행 시에 참조되며, Airflow UI에도 표시됩니다. PythonOperator의 사용 시 다음 두 가지 사항을 항상 적용해야 합니다.

 

1. 오퍼레이터 자신(get_pictures)을 정의해야 합니다.

2. python_callable은 인수에 호출이 가능한 일반 함수(_get_pictures)를 가리킵니다.

 

오퍼레이터를 실행하면 파이썬 함수가 호출되고 함수를 실행합니다. 좀 더 자세히 살펴보면 PythonOperator의 기본 사용법은 다음과 같습니다.

def _get_pictures():
	# PythonOperator callable
    # 여기서 작업 실행
    
get_pictures = PythonOperator(
	task_id="get_pictures",
    python_callable=_get_pictures,
    dag=dag, 
)

꼭 필요한 것은 아니지만, 편의를 위해 변수 이름을 get_pictures와 task_id를 동일하게 합니다.

 

p.30

pip install apache-airflow

 

airflow가 아닌 apache-airflow를 설치해야 함에 주의하십시오. Airflow가 2016년에 아파치 재단에 가입하면서, Pypi의 airflow 저장소가 apache-airflow로 이름이 변경되었습니다. 여전히 많은 사람이 airflow로 설치하고 있기 때문에 해당 리포지터리를 여전히 유지하고 있지만, 올바른 리포지터리를 알리기 위한 용도로 활용하고 있습니다.

 

p.44

Airflow는 정의된 간격 후에 태스크가 시작된다는 것을 유념하기 바랍니다. 만약 독자가 start_date를 01-01-2019라고 값을 주고 간격을 @daily로 2019년 1월 1일 13시에 DAG을 개발한 후 실행했다면 자정이 오기 전까지 DAG는 어떤 작업도 하지 않습니다.

 

p.46

자주 사용되는 스케쥴 간격에 대한 Airflow 프리셋

 

프리셋 이름 의미
@once 1회만 실행하도록 스케쥴
@hourly 매시간 변경 시 1회 실행
@daily 매일 자정에 1회 실행
@weekly 매주 일요일 자정에 1회 실행
@monthly 매월 1일 자정에 1회 실행
@yearly 매년 1월 1일 자정에 1회 실행

 

p.47

cron 식은 작업의 실행 여부를 결정하기 위해 패턴과 일치하는 현재 시간을 지속적으로 확인하기 위한 정의입니다. 이런 특성으로 인해 다음 실행 간격을 계산하기 위해 이전 작업이 실행된 시점을 기억할 필요가 없습니다. 하지만 이로 인해 제약이 따르게 됩니다.

 

그렇다면 DAG를 3일마다 실행하려면 어떻게 해야 할까요?

 

Airflow는 이런 상대적인 시간 간격으로 스케쥴 간격을 정의할 수 있도록 지원합니다. 빈도 기반 스케쥴을 사용하려면 timedelta(표준 라이브러리인 datetime 모듈에 포함된) 인스턴스를 사용하면 됩니다. 

 

dag = DAG(
    dag_id="04_time_delta",
    schedule_interval=dt.timedelta(days=3), # timedelta -> 빈도 기반 스케쥴 사용
    start_date=dt.datetime(year=2019, month=1, day=1),
    end_date=dt.datetime(year=2019, month=1, day=5),
)

 

 

이렇게 설정하면 DAG가 시작 시간으로부터 3일마다 실행됩니다(2019년 1월 4일, 7일, 10일, ...) 물론 DAG을 10분마다(timedelta(minutes=10)) 또는 2시간마다(timedelta(hours=2)) 실행할 수도 있습니다.

 

댓글