백엔드 서비스로 너무 많은 비동기 요청이 실행되서 Rate Limit 에 걸리거나 Request Timeout 이 나오는 경우 해결 방법은?

  • (1) Python.asyncio 를 사용한다면 Semaphore 를 이용하는 것:
    • 많은 수의 작업들에서 고정된 일정한 수만큼 꺼내서 실행하며, 최대 동시 실행 수를 제어하기 때문.
    • 내부 작동 방식:
      • 100개의 테스크에서 Semaphore 카운터를 10이라고 하자.
      • 나머지 90개의 테스크는 대기 상태가 되며 깨어날 때까지 리소스를 차지하지 않음.
      • Semaphore 를 얻어서 실행 중인 테스크가 이를 반납하면 하나 꺠워서 새 테스크를 실행하는 방식임.
  • (2) 일정한 수 만크 Batch 로 꺼내서 사용하는 방식 또는 Queue 에 적재해서 꺼내는 방식도 있을 수 있을거임.
  • 솔루션을 비교하는 기준은 여러가지가 있을거임:
    • 메모리 사용량
      • 배치 처리 >> Queue >> Semaphore
      • 아무래도 Semaphore 는 모든 테스크 객체(Future) 를 한번에 생성해두니까.
    • 컨택스트 스위칭
      • 배치 처리 > Semaphore > Queue
      • Semaphore 는 작업 당 대기 할 때/스케줄링 될 때 2번씩
      • 배치 처리는 작업당 1회 + 배치 간격마다 강제로(예: 100개의 작업을 10개씩 배치로 하면 10번)
      • Queue 는 폴링하는 방식이므로 이보다 많을 것.
    • 동기화 오버헤드
      • Semaphore > 배치 > Queue
      • Semaphore 는 각 작업당 독립적으로 동기화.
      • 배치 처리는 배치 단위로 동기화, 배치 완료까지 강제 대기.
      • Queue 방식이 아무래도 동기화 비용이 들어감
    • CPU 오버헤드
      • Semaphore 는 동시 실행 수에 비례 할 거임. 오버헤드는 카운터 acquire/release 마다로. CPU 를 안정적으로 쓸 것.
      • 배치 처리는 배치 시작할 때 급증하고, 조금 기다렸다가(배치 당 작업이 끝나고 다음 배치를 기다릴 때) 다시 급증하는 톱니파 형태.
      • Queue 는 워커 수에 비례하고 지속적인 폴링으로 높을 것.
      • Semaphore 이 사용량 자체는 많을 수 있으니 안정적인 형태일 것이므로 Semaphore 우세.
    • 응답성
      • Sempahore > 다른 방식들

 

Ariflow 에서 데이터를 처리해 나갈 때 dag_run_id 도 같이 저장하는 게 일반적인가? 필요한 정보들인가?

dag_run_id 를 저장할 때 발생하는 문제점:

  • 결합도 증가: 데이터가 Airflow 시스템에 강하게 의존됨
  • 이식성 문제: 다른 스케줄링 시스템으로 변경 시 문제 발생

dag_run_id 를 가지고 있으면 장점:

  • 처리에서 멱등성 보장.
  • 실패 지점에서의 재시도
  • 진행 상황 추적 가능

 

Airflow 에서 airflow.cfg 파일의 역할은?

간략하게 본 것으로는 logging 을 남기거나, 원격 자격 증명 시스템을 등록하거나, 볼 메트릭을 지정하는게 가능한 듯

 

Airflow 에서 DB 데이터를 가지고 ETL 처리해나갈 때, 동시에 실행하는 테스크 또는 DAG 가 있다면 Overwrite 되는 문제가 발생할 거 같은데 이건 어떻게 해결함?

  • 스케줄링 레벨에서 겹치지 않게 돌리는 방법이 있음. max_active_runs=1 로 주면 이전 단계가 끝나기 전에 다음 DAG Run 을 돌리지는 않음. 이것보다 엄격하게 하고 싶다면 DB 단에서 Advisory Lock 을 적용하는 방법도 있음. 이건 순수 어플리케이션에서 잠그는 락으로 누군가 잠그고 있다면 다른 DAG 에선느 실행이 안될거임. 테이블 락이나 레코드 락이 아님.
  • 데이터 모델링 계층에서는 Idempotent 키를 지정해서 여러번 넣어도 한 레코드만 들어가도록하고, updated_at > last_watermark 즉 증분 전략으로 한번만 실행되도록 함.

 

내가 생각하는 언어 모델 별 사용시점 

  • 코드 작성 및 설명:
    • Claude 4 Sonnet
  • 핵심 요구사항 정리 및 건설적인 아이디어 개선 
    • OpenAI o3, Gemini 2.5 pro

 

MongoDB 를 사용할 때 멱등성을 보장하려면:

패턴 아이디어 장점 주의
① “단일 Key Upsert” 이미 유니크한 키가 있으면 update, 없으면 insert ‒ update_one(filter, update, upsert=True) ‒ filter유일성이 담긴 필드(예: original_story_id)만 사용 • 필드 존재 여부를 직접 확인할 필요 없음 → DB가 판정 • 다중 DAG 실행·재시도에서도 중복 문서 0 • filter 조건이 너무 넓으면 원치 않는 문서가 갱신될 수 있음 → 항상 Unique Index와 함께 사용
② “Stage-Field 멱등 설계 파이프라인의 각 작업(stage) 가 자기가 채워야 할 필드가 존재있는지(또는 비어있는지) 보고 할 일이 남았는지 파악하는 것. 예: 각 태스크가 자신이 끝내야 할 필드가 비어 있는지 체크 ‒ 예: {"status": "scripted", "scenes": {"$exists": false}} • 파이프라인 단계 분기·병렬 처리 간단 • 클러스터 Down 뒤 복구 시 “어디까지 했나” 확실 • 반드시 해당 필드를 표준 스키마에 예약해 두어야 함 (타입·의미 고정)
  • Stage-Field 멱등 단계는 보통 데이터 처리 흐름이 많은 경우 빠르게, 실행되도록 또는 재처리 비용이 큰 경우 적용할 수 있는 방법임. 
  • MongoDB 에서는 멱등성 처리를 위해서 "$setOnInsert" 와 $set 을 구분해서 넣기도 해야함:
    • $setOnInsert: Insert 한 번 발생할 때 적용 (최초 생성 시 고정되는 값)
    • $set: insert or update 모두 매번 적용 (실행할 떄마다 덮어씀)

 

Airflow + MongoDB 병렬 파이프라인에서 멱등성, 중복 Zero, 누락 Zero 처리 방법

  • 병렬 처리할 때 Unique Key + findOneAndUpdate 로 DAG 마다 처리해야할 문서를 dag_run_id 로 격리하는거. 이후 파이프라인 스텝에서 조회가 필요하다면 dag_run_id 를 이용한 쿼리로, 다른 DAG 에서 못가져가게 하는 것.
  • 여러 DAG 에서 findOneAndUpdate 가 동시적으로 일어나면 중복으로 문서들을 가져갈 확률은 없나? 원자적 연산이므로 문제없음.

'업무일지' 카테고리의 다른 글

2025.07.15 학습 로그  (0) 2025.07.16
250608 학습 로그  (1) 2025.06.08
250520 학습 로그  (0) 2025.05.20

+ Recent posts