Kafka Consumer 에서 커밋은 크게 자동 커밋을 하느냐 수동 커밋을 하느냐로 나눠진다.

 

수동 커밋도 동기로 하느냐, 비동기로 하느냐, 아니면 이를 섞어서 하는 등의 여러가지 방법이 있다.

 

이것들에 대해서 알아보자.

1. 자동 커밋

enable.auto.commit=true 설정을 하면 자동 커밋을 하도록 설정할 수 있다.

 

이 경우에는 auto.commit.interval.ms 설정을 주기로 자동으로 커밋한다.

 

자동 커밋의 내부 동작 원리는 다음과 같다:

  • 커밋을 해야하는 주기가 돌아올 경우, 브로커로부터 메시지를 가져오는 poll() 메소드를 호출할 때 자동으로 커밋을 수행한다. 이때 이전에 poll() 로 읽었던 마지막 오프셋이 커밋된다.

이런 구조로 이뤄지기 때문에 만약 처리한 메시지가 예외가 났다면 다음 번 poll() 의 자동 커밋으로 인해 이 메시지는 처리 되었다고 판단될 가능성이 있다.

  • 이 뿐 아니라 컨슈머에서 메시지를 카프카 쪽에서 가지고 오는 스레드와 이 메시지를 처리하는 스레드가 별개인 경우에는 메시지가 처리되지 않았는데 커밋하는 경우도 발생할 수 있다. 

그리고 자동 커밋의 주기가 돌아오기 전에 컨슈머 어플리케이션이 죽는다면 메시지 중복 처리가 발생할 수 있다.

 

그러나 중복 처리는 별로 걱정하지 않아도 된다. 해결 방법이 많아서: 

  • 멱등성 있게 처리하거나
  • 멱등성 연산으로 만들거나
  • WAL (Write Ahead Log) 에 기록을 하면 되니까

 

2. 수동 커밋

2.1 동기식으로 현재 오프셋 커밋하기

수동 커밋에서 동기식으로 현재 오프셋을 커밋하는 방법은 commitSync() 를 이용하는 방법이다.

 

commitSync() 는 커밋이 완료될 때까지 블로킹 되며, 커밋 연산 자체가 실패할 경우 알아서 재시도를 한다.

 

다음과 같은 식으로 사용할 수 있다.

Duration timeout = Duration.ofMillis(100); 
while(true) {
    ConsumerRecords<String, String> records = consumer.poll(timeout); 
    ... // records 처리 

    try {
        consumer.commitSync(); 
    } catch (CommitFailedException e) {
        log.error("commit failed", e); 
    }
}
  • 예제에서 중요한 건 메시지 처리가 끝난 후 커밋을 했다는 것이다. 이렇게 해야지 유실없이 처리할 수 있다.
  • 그리고 이 경우엔 커밋을 하기전에 어플리케이션이 죽는다면 중복 처리가 발생할 수 있다.

 

2.2 비동기식으로 커밋하기

수동 커밋은 커밋이 끝날 때까지 기다려야하므로 처리량이 잘 나오지 않는다.

 

이런 경우에는 commitAsync() 를 통해 비동기 커밋을 할 수 있다.

 

비동기 커밋은 동기식 커밋과는 다르게 재시도를 하지 않는다. 어찌보면 당연한데 다음번 poll 의 비동기 커밋에서 성공할 수 있으니까 재시도를 하지 않는 것임.

 

비동기 커밋도 콜백 함수를 넣을 수 있는데 여기에서 필요하다면 재시도를 하게 만들 수 있다. 가령 현재 메시지 처리가 마지막 메시지라면 재시도를 하는게 유의미할 수 있으니까 이 경우에 하는거지.

 

2.3 동기식과 비동기식 커밋 함께 사용하기

어플리케이션이 죽을 때나 리밸런싱이 발생할 때는 명시적으로 commitSync() 로 동기식으로 커밋하고, 그 외 일반적인 메시지 처리 커밋은 비동기식으로 커밋을 하는 방식이다.

 

2.3 특정 오프셋 커밋하기

메시지 처리 비용이 높다면 중복 처리를 피하고 싶을 것이다. 이 경우에 메시지 처리 중간마다 커밋을 하도록 하는 방법이 있다.

뭐 예시로 들면 1000개 레코드 처리마다 커밋을 한다던지. 이 경우에도 commitAsync() 로 비동기 커밋이 어울린다.

+ Recent posts