Apache Airflow
Apache Airflow Best Practices (1) Working Level
youngerjesus
2025. 6. 9. 21:08
DAG 설계 패턴의 핵심 원칙
아키텍처 설계의 황금 규칙
멱등성(Idempotency)과 원자성(Atomicity) 이 모든 DAG 설계의 기초입니다. 동일한 입력에 대해 몇 번을 실행하더라도 동일한 결과를 생성해야 하며, 각 태스크는 완전히 성공하거나 완전히 실패해야 합니다.
# 좋은 예: 멱등성을 보장하는 UPSERT 패턴
@task
def process_data(**context):
data_date = context['data_interval_start']
# 특정 파티션에서 읽기 (datetime.now() 사용 금지)
df = read_partition(data_date)
# UPSERT로 멱등성 보장
df.to_sql('target_table', if_exists='replace',
method='upsert', index_col='date')
# 나쁜 예: 비멱등성 코드
expensive_config = fetch_from_database() # 최상위 레벨에서 실행됨
@task
def bad_task():
now = datetime.now() # 실행 시점마다 다른 값
insert_data(now) # 중복 데이터 생성 위험
동적 DAG 생성과 패턴
팩토리 패턴을 사용한 동적 DAG 생성은 2024년 가장 널리 채택된 패턴 중 하나입니다.
- 동적 DAG 생성은 Airflow에서 반복되는 워크플로우를 프로그래밍 방식으로 생성하는 패텅임.
- 이를 통해 유사한 DAG 를 수동으로 작성하는 대신 코드로 자동으로 생성하는게 간으하다.
# 나쁜 예: 각 테이블마다 별도 DAG 파일 작성
# dag_table_users.py
@dag(dag_id='etl_users', schedule='@daily')
def etl_users():
extract = ExtractOperator(table='users')
transform = TransformOperator(table='users')
load = LoadOperator(table='users')
extract >> transform >> load
# dag_table_orders.py
@dag(dag_id='etl_orders', schedule='@daily')
def etl_orders():
extract = ExtractOperator(table='orders')
transform = TransformOperator(table='orders')
load = LoadOperator(table='orders')
extract >> transform >> load
# dag_table_products.py ... (반복)
# 좋은 예: 하나의 파일로 모든 테이블 DAG 생성
tables = ['users', 'orders', 'products', 'inventory', 'shipping']
for table in tables:
dag_id = f'etl_{table}'
@dag(dag_id=dag_id, schedule='@daily', tags=['etl', table])
def create_etl_dag():
extract = ExtractOperator(table=table)
transform = TransformOperator(table=table)
load = LoadOperator(table=table)
extract >> transform >> load
# 전역 네임스페이스에 DAG 등록
globals()[dag_id] = create_etl_dag()
class DAGFactory:
"""더 복잡한 로직을 위한 팩토리 클래스"""
def __init__(self, default_args):
self.default_args = default_args
self.created_dags = []
def create_dag(self, config):
"""설정 딕셔너리로부터 DAG 생성"""
dag = DAG(
dag_id=config['dag_id'],
schedule_interval=config.get('schedule', '@daily'),
default_args=self.default_args,
tags=config.get('tags', []),
description=config.get('description', '')
)
with dag:
# 태스크 체인 동적 생성
previous_task = None
for task_config in config['tasks']:
task = self._create_task(task_config)
if previous_task:
previous_task >> task
previous_task = task
self.created_dags.append(dag)
return dag
def _create_task(self, task_config):
"""태스크 타입별 생성 로직"""
task_type = task_config['type']
if task_type == 'bash':
return BashOperator(
task_id=task_config['id'],
bash_command=task_config['command']
)
elif task_type == 'python':
return PythonOperator(
task_id=task_config['id'],
python_callable=task_config['callable']
)
elif task_type == 'sql':
return SqlOperator(
task_id=task_config['id'],
sql=task_config['query'],
conn_id=task_config['connection']
)
설정 파일 기반 DAG 생성:
dags_config.yaml
dags:
- dag_id: sales_etl
schedule: "@daily"
tags: ["sales", "etl"]
tasks:
- id: extract_sales
type: sql
query: "SELECT * FROM sales WHERE date = '{{ ds }}'"
connection: postgres_default
- id: transform_sales
type: python
callable: transform_sales_data
- id: load_sales
type: sql
query: "INSERT INTO sales_fact VALUES ..."
connection: warehouse
- dag_id: customer_etl
schedule: "@hourly"
tags: ["customer", "etl"]
tasks:
- id: extract_customers
type: api
endpoint: "/customers/updates"
- id: validate_customers
type: python
callable: validate_customer_data
TaskGroup으로 복잡성 관리
TaskGroup은 더 이상 사용되지 않는 SubDAG를 대체 Apache Airflow하는 현대적 접근법입니다.
- TaskGroup은 Airflow 2.0에서 도입된 태스크들을 논리적으로 그룹화하는 기능입니다. 복잡한 DAG를 더 읽기 쉽고 관리하기 쉽게 만들어주는 핵심 도구입니다.
- UI에서는 접고 펼 수 있는 그룹으로 표시되며, 코드에서는 재사용 가능한 컴포넌트로 활용됩니다.
# 나쁜 예: SubDAG (더 이상 권장하지 않음)
def create_subdag(parent_dag_id, child_dag_id, default_args):
subdag = DAG(
dag_id=f"{parent_dag_id}.{child_dag_id}",
default_args=default_args,
schedule_interval="@daily"
)
# SubDAG의 문제점:
# 1. 별도의 DAG 인스턴스 생성 (메모리 오버헤드)
# 2. 독립적인 스케줄러가 필요 (성능 저하)
# 3. 디버깅이 어려움
# 4. 부모 DAG과 다른 실행 날짜를 가질 수 있음
with subdag:
task1 = DummyOperator(task_id='task1')
task2 = DummyOperator(task_id='task2')
task1 >> task2
return subdag
# SubDAG 사용 (권장하지 않음)
subdag_task = SubDagOperator(
task_id='processing_tasks',
subdag=create_subdag('parent_dag', 'processing_tasks', default_args)
)
# 좋은 예: TaskGroup
from airflow.utils.task_group import TaskGroup
with DAG('example_dag', start_date=datetime(2024, 1, 1)) as dag:
# TaskGroup 생성
with TaskGroup(group_id='processing_tasks') as processing_group:
task1 = DummyOperator(task_id='task1')
task2 = DummyOperator(task_id='task2')
task1 >> task2
# TaskGroup의 장점:
# 1. 가볍다 (별도 DAG 인스턴스 없음)
# 2. 부모 DAG와 동일한 실행 컨텍스트
# 3. UI에서 접고 펼 수 있음
# 4. 재사용 가능한 컴포넌트화
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
start >> processing_group >> end
성능 최적화의 실전 전략
스케줄러 성능 튜닝
DAG 파싱이 가장 큰 성능 병목 Apache입니다. 파싱 시간이 5분을 넘으면 문제가 있다고 봐야 합니다.
스케줄러는 Airflow의 심장이라고 함.
DAG 파싱, 태스크 스케줄링, 상태 업데이트 등 모든 핵심 작업을 담당합니다. 스케줄러 튜닝은 전체 Airflow 성능의 80%를 좌우
스케줄러 작동 원리:
# 스케줄러의 주요 작업 사이클
"""
1. DAG 파일 스캔 (dag_dir_list_interval)
↓
2. 변경된 DAG 파일 파싱 (parsing_processes)
↓
3. DAG Run 생성 (max_dagruns_to_create_per_loop)
↓
4. 실행 가능한 태스크 식별
↓
5. 태스크 인스턴스 큐잉 (parallelism, dag_concurrency)
↓
6. Executor로 태스크 전송
↓
7. 태스크 상태 업데이트
↓
(반복)
"""
# airflow.cfg 최적화 설정
[scheduler]
parsing_processes = 8 # DAT 파싱 프로세스 수: CPU 코어 수 * 2 - 1
dag_dir_list_interval = 600 # 스케줄러가 DAG 폴더를 스캔하는 주기(초): 기본 300에서 600정도로 튜닝. 개발 서버라면 60 정도로 작게 주는게 나을 수 있음.
min_file_process_interval = 60 # 일한 DAG 파일을 다시 파싱하기 전 최소 대기 시간(초): 기본 30에서 60정도로 증가
max_dagruns_to_create_per_loop = 5 # 스케줄러 루프 한 번에 생성할 수 있는 최대 DAG Run 수: 기본 10에서 5정도로 감소
[core]
parallelism = 32 # 전체 병렬처리로 인프라 용량에 맞춰 설정하는게 좋음. 동시에 실행할 수 있는 최대 테스크 수를 지정함.
dag_concurrency = 16 # 단일 DAG 내에서 동시에 실행할 수 있는 최대 태스크 수를 말함.
최대 DAG Run 수를 제어하는 이유:
- 메모리 부족
- 데이터베이스 연결 고갈
- Executor 과부하
- UI 응답 속도 저하
Executor 선택 가이드
각 Executor의 특성을 이해하고 워크로드에 맞게 선택해야 합니다. (확장성 측면)
# 워크로드별 Executor 선택 매트릭스
"""
워크로드 타입 | 권장 Executor | 확장 전략
개발/테스트 | LocalExecutor | 수직 확장
고처리량 안정 워크로드 | CeleryExecutor | 수평 확장
리소스 집약적 워크로드 | KubernetesExecutor | 동적 확장
혼합 워크로드 | CeleryKubernetesExecutor| 하이브리드
"""
리소스 관리와 Pool 활용
Airflow Pool을 사용한 리소스 관리는 성능 최적화의 핵심입니다.
Pool 을 생성하고 테스크에서 사용하는 방법:
# Pool 설정 계산 공식
# 필요 슬롯 = 병렬 태스크 수 × 태스크당 슬롯 × 지연 허용도
# Pool 생성 및 사용
high_memory_pool = Pool(
pool='high_memory_tasks',
slots=4,
description='High memory intensive tasks'
)
@task(pool='high_memory_tasks', pool_slots=2)
def memory_intensive_task():
return process_large_dataset()
이처럼 Pool은 Airflow에서 리소스 제한과 동시 실행 제어를 위한 핵심 메커니즘임.
데이터베이스 연결, API 호출 제한, 메모리 집약적 작업 등을 효과적으로 관리할 수 있습니다.
"""
Pool = 동시에 실행할 수 있는 태스크 수를 제한하는 논리적 그룹
예시:
- database_pool: 5 슬롯 (동시 DB 연결 5개까지)
- api_pool: 10 슬롯 (API 동시 호출 10개까지)
- heavy_compute_pool: 2 슬롯 (메모리 집약 작업 2개까지)
"""
# Pool 작동 방식
"""
1. 태스크가 pool을 지정하면 실행 전 슬롯 확인
2. 사용 가능한 슬롯이 있으면 할당 후 실행
3. 슬롯이 없으면 대기 (queued 상태)
4. 실행 완료 시 슬롯 반환
"""
Pool vs 다른 리소스 제어 방법:
# 1. Pool (가장 세밀한 제어)
@task(pool='database_pool', pool_slots=2)
def heavy_db_operation():
# 이 태스크는 database_pool에서 2개 슬롯 사용
pass
# 2. parallelism (전체 시스템 제한)
# airflow.cfg
# parallelism = 32 # 전체 동시 실행 태스크
# 3. dag_concurrency (DAG 레벨 제한)
with DAG('my_dag', concurrency=10):
# 이 DAG는 최대 10개 태스크만 동시 실행
# 4. task_concurrency (태스크 레벨 제한)
@task(task_concurrency=3)
def api_call():
# 이 태스크는 최대 3개 인스턴스만 동시 실행
실무 경험 기반 핵심 팁:
즉시 적용 가능한 성능 개선
- 최상위 코드 최적화: DAG 파일 최상단에서 무거운 연산 금지 ApacheApache
- Variable 사용법: Jinja 템플릿 활용, 최상위 Variable.get() 호출 금지
- Executor 업그레이드: Sequential/Local에서 Celery/Kubernetes로 이관
- 데이터베이스 최적화: PGBouncer 연결 풀링 적