DAG 설계 패턴의 핵심 원칙

아키텍처 설계의 황금 규칙

멱등성(Idempotency)과 원자성(Atomicity) 이 모든 DAG 설계의 기초입니다. 동일한 입력에 대해 몇 번을 실행하더라도 동일한 결과를 생성해야 하며, 각 태스크는 완전히 성공하거나 완전히 실패해야 합니다.

# 좋은 예: 멱등성을 보장하는 UPSERT 패턴
@task
def process_data(**context):
    data_date = context['data_interval_start']

    # 특정 파티션에서 읽기 (datetime.now() 사용 금지)
    df = read_partition(data_date)

    # UPSERT로 멱등성 보장
    df.to_sql('target_table', if_exists='replace', 
              method='upsert', index_col='date')
# 나쁜 예: 비멱등성 코드
expensive_config = fetch_from_database()  # 최상위 레벨에서 실행됨

@task
def bad_task():
    now = datetime.now()  # 실행 시점마다 다른 값
    insert_data(now)  # 중복 데이터 생성 위험

 

동적 DAG 생성과 패턴

팩토리 패턴을 사용한 동적 DAG 생성은 2024년 가장 널리 채택된 패턴 중 하나입니다.

  • 동적 DAG 생성은 Airflow에서 반복되는 워크플로우를 프로그래밍 방식으로 생성하는 패텅임.
  • 이를 통해 유사한 DAG 를 수동으로 작성하는 대신 코드로 자동으로 생성하는게 간으하다.
# 나쁜 예: 각 테이블마다 별도 DAG 파일 작성
# dag_table_users.py
@dag(dag_id='etl_users', schedule='@daily')
def etl_users():
    extract = ExtractOperator(table='users')
    transform = TransformOperator(table='users')
    load = LoadOperator(table='users')
    extract >> transform >> load

# dag_table_orders.py
@dag(dag_id='etl_orders', schedule='@daily')
def etl_orders():
    extract = ExtractOperator(table='orders')
    transform = TransformOperator(table='orders')
    load = LoadOperator(table='orders')
    extract >> transform >> load

# dag_table_products.py ... (반복)
# 좋은 예: 하나의 파일로 모든 테이블 DAG 생성
tables = ['users', 'orders', 'products', 'inventory', 'shipping']

for table in tables:
    dag_id = f'etl_{table}'

    @dag(dag_id=dag_id, schedule='@daily', tags=['etl', table])
    def create_etl_dag():
        extract = ExtractOperator(table=table)
        transform = TransformOperator(table=table)
        load = LoadOperator(table=table)
        extract >> transform >> load

    # 전역 네임스페이스에 DAG 등록
    globals()[dag_id] = create_etl_dag()
class DAGFactory:
    """더 복잡한 로직을 위한 팩토리 클래스"""

    def __init__(self, default_args):
        self.default_args = default_args
        self.created_dags = []

    def create_dag(self, config):
        """설정 딕셔너리로부터 DAG 생성"""
        dag = DAG(
            dag_id=config['dag_id'],
            schedule_interval=config.get('schedule', '@daily'),
            default_args=self.default_args,
            tags=config.get('tags', []),
            description=config.get('description', '')
        )

        with dag:
            # 태스크 체인 동적 생성
            previous_task = None

            for task_config in config['tasks']:
                task = self._create_task(task_config)

                if previous_task:
                    previous_task >> task

                previous_task = task

        self.created_dags.append(dag)
        return dag

    def _create_task(self, task_config):
        """태스크 타입별 생성 로직"""
        task_type = task_config['type']

        if task_type == 'bash':
            return BashOperator(
                task_id=task_config['id'],
                bash_command=task_config['command']
            )
        elif task_type == 'python':
            return PythonOperator(
                task_id=task_config['id'],
                python_callable=task_config['callable']
            )
        elif task_type == 'sql':
            return SqlOperator(
                task_id=task_config['id'],
                sql=task_config['query'],
                conn_id=task_config['connection']
            )

 

설정 파일 기반 DAG 생성:

 dags_config.yaml
dags:
  - dag_id: sales_etl
    schedule: "@daily"
    tags: ["sales", "etl"]
    tasks:
      - id: extract_sales
        type: sql
        query: "SELECT * FROM sales WHERE date = '{{ ds }}'"
        connection: postgres_default
      - id: transform_sales
        type: python
        callable: transform_sales_data
      - id: load_sales
        type: sql
        query: "INSERT INTO sales_fact VALUES ..."
        connection: warehouse

  - dag_id: customer_etl
    schedule: "@hourly"
    tags: ["customer", "etl"]
    tasks:
      - id: extract_customers
        type: api
        endpoint: "/customers/updates"
      - id: validate_customers
        type: python
        callable: validate_customer_data

 

TaskGroup으로 복잡성 관리

TaskGroup은 더 이상 사용되지 않는 SubDAG를 대체 Apache Airflow하는 현대적 접근법입니다.

  • TaskGroup은 Airflow 2.0에서 도입된 태스크들을 논리적으로 그룹화하는 기능입니다. 복잡한 DAG를 더 읽기 쉽고 관리하기 쉽게 만들어주는 핵심 도구입니다.
  • UI에서는 접고 펼 수 있는 그룹으로 표시되며, 코드에서는 재사용 가능한 컴포넌트로 활용됩니다.
# 나쁜 예: SubDAG (더 이상 권장하지 않음)
def create_subdag(parent_dag_id, child_dag_id, default_args):
    subdag = DAG(
        dag_id=f"{parent_dag_id}.{child_dag_id}",
        default_args=default_args,
        schedule_interval="@daily"
    )

    # SubDAG의 문제점:
    # 1. 별도의 DAG 인스턴스 생성 (메모리 오버헤드)
    # 2. 독립적인 스케줄러가 필요 (성능 저하)
    # 3. 디버깅이 어려움
    # 4. 부모 DAG과 다른 실행 날짜를 가질 수 있음

    with subdag:
        task1 = DummyOperator(task_id='task1')
        task2 = DummyOperator(task_id='task2')
        task1 >> task2

    return subdag

# SubDAG 사용 (권장하지 않음)
subdag_task = SubDagOperator(
    task_id='processing_tasks',
    subdag=create_subdag('parent_dag', 'processing_tasks', default_args)
)
# 좋은 예: TaskGroup
from airflow.utils.task_group import TaskGroup

with DAG('example_dag', start_date=datetime(2024, 1, 1)) as dag:

    # TaskGroup 생성
    with TaskGroup(group_id='processing_tasks') as processing_group:
        task1 = DummyOperator(task_id='task1')
        task2 = DummyOperator(task_id='task2')
        task1 >> task2

    # TaskGroup의 장점:
    # 1. 가볍다 (별도 DAG 인스턴스 없음)
    # 2. 부모 DAG와 동일한 실행 컨텍스트
    # 3. UI에서 접고 펼 수 있음
    # 4. 재사용 가능한 컴포넌트화

    start = DummyOperator(task_id='start')
    end = DummyOperator(task_id='end')

    start >> processing_group >> end

 

성능 최적화의 실전 전략

스케줄러 성능 튜닝

DAG 파싱이 가장 큰 성능 병목 Apache입니다. 파싱 시간이 5분을 넘으면 문제가 있다고 봐야 합니다.

스케줄러는 Airflow의 심장이라고 함.

 

DAG 파싱, 태스크 스케줄링, 상태 업데이트 등 모든 핵심 작업을 담당합니다. 스케줄러 튜닝은 전체 Airflow 성능의 80%를 좌우

 

스케줄러 작동 원리:

# 스케줄러의 주요 작업 사이클
"""
1. DAG 파일 스캔 (dag_dir_list_interval)
   ↓
2. 변경된 DAG 파일 파싱 (parsing_processes)
   ↓
3. DAG Run 생성 (max_dagruns_to_create_per_loop)
   ↓
4. 실행 가능한 태스크 식별
   ↓
5. 태스크 인스턴스 큐잉 (parallelism, dag_concurrency)
   ↓
6. Executor로 태스크 전송
   ↓
7. 태스크 상태 업데이트
   ↓
(반복)
"""
# airflow.cfg 최적화 설정
[scheduler]
parsing_processes = 8  # DAT 파싱 프로세스 수: CPU 코어 수 * 2 - 1
dag_dir_list_interval = 600  # 스케줄러가 DAG 폴더를 스캔하는 주기(초): 기본 300에서 600정도로 튜닝. 개발 서버라면 60 정도로 작게 주는게 나을 수 있음. 
min_file_process_interval = 60  # 일한 DAG 파일을 다시 파싱하기 전 최소 대기 시간(초): 기본 30에서 60정도로 증가
max_dagruns_to_create_per_loop = 5  # 스케줄러 루프 한 번에 생성할 수 있는 최대 DAG Run 수: 기본 10에서 5정도로 감소 

[core]
parallelism = 32  # 전체 병렬처리로 인프라 용량에 맞춰 설정하는게 좋음. 동시에 실행할 수 있는 최대 테스크 수를 지정함. 
dag_concurrency = 16  # 단일 DAG 내에서 동시에 실행할 수 있는 최대 태스크 수를 말함. 

 

최대 DAG Run 수를 제어하는 이유:

  • 메모리 부족
  • 데이터베이스 연결 고갈
  • Executor 과부하
  • UI 응답 속도 저하

 

Executor 선택 가이드

 

각 Executor의 특성을 이해하고 워크로드에 맞게 선택해야 합니다. (확장성 측면)

# 워크로드별 Executor 선택 매트릭스
"""
워크로드 타입          | 권장 Executor           | 확장 전략
개발/테스트           | LocalExecutor          | 수직 확장
고처리량 안정 워크로드  | CeleryExecutor         | 수평 확장  
리소스 집약적 워크로드  | KubernetesExecutor     | 동적 확장
혼합 워크로드         | CeleryKubernetesExecutor| 하이브리드
"""

 

리소스 관리와 Pool 활용

Airflow Pool을 사용한 리소스 관리는 성능 최적화의 핵심입니다.

 

Pool 을 생성하고 테스크에서 사용하는 방법:

# Pool 설정 계산 공식
# 필요 슬롯 = 병렬 태스크 수 × 태스크당 슬롯 × 지연 허용도

# Pool 생성 및 사용
high_memory_pool = Pool(
    pool='high_memory_tasks',
    slots=4,
    description='High memory intensive tasks'
)

@task(pool='high_memory_tasks', pool_slots=2)
def memory_intensive_task():
    return process_large_dataset()

 

이처럼 Pool은 Airflow에서 리소스 제한과 동시 실행 제어를 위한 핵심 메커니즘임.

 

데이터베이스 연결, API 호출 제한, 메모리 집약적 작업 등을 효과적으로 관리할 수 있습니다.

"""
Pool = 동시에 실행할 수 있는 태스크 수를 제한하는 논리적 그룹

예시:
- database_pool: 5 슬롯 (동시 DB 연결 5개까지)
- api_pool: 10 슬롯 (API 동시 호출 10개까지)
- heavy_compute_pool: 2 슬롯 (메모리 집약 작업 2개까지)
"""

# Pool 작동 방식
"""
1. 태스크가 pool을 지정하면 실행 전 슬롯 확인
2. 사용 가능한 슬롯이 있으면 할당 후 실행
3. 슬롯이 없으면 대기 (queued 상태)
4. 실행 완료 시 슬롯 반환
"""

 

Pool vs 다른 리소스 제어 방법:

# 1. Pool (가장 세밀한 제어)
@task(pool='database_pool', pool_slots=2)
def heavy_db_operation():
    # 이 태스크는 database_pool에서 2개 슬롯 사용
    pass

# 2. parallelism (전체 시스템 제한)
# airflow.cfg
# parallelism = 32  # 전체 동시 실행 태스크

# 3. dag_concurrency (DAG 레벨 제한)
with DAG('my_dag', concurrency=10):
    # 이 DAG는 최대 10개 태스크만 동시 실행

# 4. task_concurrency (태스크 레벨 제한)
@task(task_concurrency=3)
def api_call():
    # 이 태스크는 최대 3개 인스턴스만 동시 실행

 

실무 경험 기반 핵심 팁:

즉시 적용 가능한 성능 개선

  • 최상위 코드 최적화: DAG 파일 최상단에서 무거운 연산 금지 ApacheApache
  • Variable 사용법: Jinja 템플릿 활용, 최상위 Variable.get() 호출 금지
  • Executor 업그레이드: Sequential/Local에서 Celery/Kubernetes로 이관
  • 데이터베이스 최적화: PGBouncer 연결 풀링 적

'Apache Airflow' 카테고리의 다른 글

Airflow 관리  (0) 2025.06.11
Airflow Best Practices - 효율적인 데이터 처리  (0) 2025.06.11
Apache Airflow Best Practices (2) Credentials  (0) 2025.06.09

+ Recent posts