백엔드 서비스로 너무 많은 비동기 요청이 실행되서 Rate Limit 에 걸리거나 Request Timeout 이 나오는 경우 해결 방법은?
- (1) Python.asyncio 를 사용한다면 Semaphore 를 이용하는 것:
- 많은 수의 작업들에서 고정된 일정한 수만큼 꺼내서 실행하며, 최대 동시 실행 수를 제어하기 때문.
- 내부 작동 방식:
- 100개의 테스크에서 Semaphore 카운터를 10이라고 하자.
- 나머지 90개의 테스크는 대기 상태가 되며 깨어날 때까지 리소스를 차지하지 않음.
- Semaphore 를 얻어서 실행 중인 테스크가 이를 반납하면 하나 꺠워서 새 테스크를 실행하는 방식임.
- (2) 일정한 수 만크 Batch 로 꺼내서 사용하는 방식 또는 Queue 에 적재해서 꺼내는 방식도 있을 수 있을거임.
- 솔루션을 비교하는 기준은 여러가지가 있을거임:
- 메모리 사용량
배치 처리 >> Queue >> Semaphore
- 아무래도 Semaphore 는 모든 테스크 객체(Future) 를 한번에 생성해두니까.
- 컨택스트 스위칭
배치 처리 > Semaphore > Queue
- Semaphore 는 작업 당 대기 할 때/스케줄링 될 때 2번씩
- 배치 처리는 작업당 1회 + 배치 간격마다 강제로(예: 100개의 작업을 10개씩 배치로 하면 10번)
- Queue 는 폴링하는 방식이므로 이보다 많을 것.
- 동기화 오버헤드
- Semaphore > 배치 > Queue
- Semaphore 는 각 작업당 독립적으로 동기화.
- 배치 처리는 배치 단위로 동기화, 배치 완료까지 강제 대기.
- Queue 방식이 아무래도 동기화 비용이 들어감
- CPU 오버헤드
- Semaphore 는 동시 실행 수에 비례 할 거임. 오버헤드는 카운터 acquire/release 마다로. CPU 를 안정적으로 쓸 것.
- 배치 처리는 배치 시작할 때 급증하고, 조금 기다렸다가(배치 당 작업이 끝나고 다음 배치를 기다릴 때) 다시 급증하는 톱니파 형태.
- Queue 는 워커 수에 비례하고 지속적인 폴링으로 높을 것.
- Semaphore 이 사용량 자체는 많을 수 있으니 안정적인 형태일 것이므로 Semaphore 우세.
- 응답성
- Sempahore > 다른 방식들
- 메모리 사용량
Ariflow 에서 데이터를 처리해 나갈 때 dag_run_id 도 같이 저장하는 게 일반적인가? 필요한 정보들인가?
dag_run_id 를 저장할 때 발생하는 문제점:
- 결합도 증가: 데이터가 Airflow 시스템에 강하게 의존됨
- 이식성 문제: 다른 스케줄링 시스템으로 변경 시 문제 발생
dag_run_id 를 가지고 있으면 장점:
- 처리에서 멱등성 보장.
- 실패 지점에서의 재시도
- 진행 상황 추적 가능
Airflow 에서 airflow.cfg 파일의 역할은?
간략하게 본 것으로는 logging 을 남기거나, 원격 자격 증명 시스템을 등록하거나, 볼 메트릭을 지정하는게 가능한 듯
Airflow 에서 DB 데이터를 가지고 ETL 처리해나갈 때, 동시에 실행하는 테스크 또는 DAG 가 있다면 Overwrite 되는 문제가 발생할 거 같은데 이건 어떻게 해결함?
- 스케줄링 레벨에서 겹치지 않게 돌리는 방법이 있음. max_active_runs=1 로 주면 이전 단계가 끝나기 전에 다음 DAG Run 을 돌리지는 않음. 이것보다 엄격하게 하고 싶다면 DB 단에서 Advisory Lock 을 적용하는 방법도 있음. 이건 순수 어플리케이션에서 잠그는 락으로 누군가 잠그고 있다면 다른 DAG 에선느 실행이 안될거임. 테이블 락이나 레코드 락이 아님.
- 데이터 모델링 계층에서는 Idempotent 키를 지정해서 여러번 넣어도 한 레코드만 들어가도록하고, updated_at > last_watermark 즉 증분 전략으로 한번만 실행되도록 함.
내가 생각하는 언어 모델 별 사용시점
- 코드 작성 및 설명:
- Claude 4 Sonnet
- 핵심 요구사항 정리 및 건설적인 아이디어 개선
- OpenAI o3, Gemini 2.5 pro
MongoDB 를 사용할 때 멱등성을 보장하려면:
패턴 | 아이디어 | 장점 | 주의 |
---|---|---|---|
① “단일 Key Upsert” | 이미 유니크한 키가 있으면 update, 없으면 insert ‒ update_one(filter, update, upsert=True) ‒ filter 에 유일성이 담긴 필드(예: original_story_id)만 사용 | • 필드 존재 여부를 직접 확인할 필요 없음 → DB가 판정 • 다중 DAG 실행·재시도에서도 중복 문서 0 | • filter 조건이 너무 넓으면 원치 않는 문서가 갱신될 수 있음 → 항상 Unique Index와 함께 사용 |
② “Stage-Field 멱등 설계 | 파이프라인의 각 작업(stage) 가 자기가 채워야 할 필드가 존재있는지(또는 비어있는지) 보고 할 일이 남았는지 파악하는 것. 예: 각 태스크가 자신이 끝내야 할 필드가 비어 있는지 체크 ‒ 예: {"status": "scripted", "scenes": {"$exists": false}} | • 파이프라인 단계 분기·병렬 처리 간단 • 클러스터 Down 뒤 복구 시 “어디까지 했나” 확실 | • 반드시 해당 필드를 표준 스키마에 예약해 두어야 함 (타입·의미 고정) |
- Stage-Field 멱등 단계는 보통 데이터 처리 흐름이 많은 경우 빠르게, 실행되도록 또는 재처리 비용이 큰 경우 적용할 수 있는 방법임.
- MongoDB 에서는 멱등성 처리를 위해서 "$setOnInsert" 와 $set 을 구분해서 넣기도 해야함:
- $setOnInsert: Insert 한 번 발생할 때 적용 (최초 생성 시 고정되는 값)
- $set: insert or update 모두 매번 적용 (실행할 떄마다 덮어씀)
Airflow + MongoDB 병렬 파이프라인에서 멱등성, 중복 Zero, 누락 Zero 처리 방법
- 병렬 처리할 때 Unique Key + findOneAndUpdate 로 DAG 마다 처리해야할 문서를 dag_run_id 로 격리하는거. 이후 파이프라인 스텝에서 조회가 필요하다면 dag_run_id 를 이용한 쿼리로, 다른 DAG 에서 못가져가게 하는 것.
- 여러 DAG 에서 findOneAndUpdate 가 동시적으로 일어나면 중복으로 문서들을 가져갈 확률은 없나? 원자적 연산이므로 문제없음.
'업무일지' 카테고리의 다른 글
2025.07.15 학습 로그 (0) | 2025.07.16 |
---|---|
250608 학습 로그 (1) | 2025.06.08 |
250520 학습 로그 (0) | 2025.05.20 |