카프카 컨슈머 어플리케이션을 부팅할 때 시작하는 Offset 은 다음과 같은 기준으로 적용된다:

  • resetOffset 이 false 일 때:
    • 브로커 토픽 파티션에 저장된 offset 이 있다면 그걸 이용한다. 그러나 파티션에 저장된 이 오프셋이 만료된 경우라면 auto.offset.reset 값에 따라서 시작하는 offset 이 결정된다. 크게 두 가지 값인 earliestlastest 값이 있음.
    • 파티션에 저장된 오프셋이 만료 되는 시간은 offsets.retention.minutes 값에 따라서 결정된다. 기본값은 7일임. 즉 7일이 지나면 사라진다. 
  • resetOffset 이 true 일 때:
    • startOffset 값에 따라서 결정된다. 크게 earliestlastest 값이 있는데 각각에 맞는 유용한 시나리오가 있음.
    • startOffset=earliest 인 경우:
      • Compaction 이 적용된 토픽에서 첫 값부터 읽어와서 상태를 만들고 싶은 경우.
    • startOffset=lastest 인 경우:
      • 컨슈머가 부팅된 시점부터 이벤트를 읽어오고 싶은 경우.
  • Spring Cloud Stream 에서 설정을 적용할 땐 다음 레퍼런스를 참고해서 적용하면 된다.

 

초기 시작하는 Offset 을 좀 더 세밀하게 제어하고 싶은 경우에는 KafkaBindingRebalanceListener 를 이용해야한다:

  • onPartitionsAssigned 에서 초기 시작하는 커밋을 가져와서 세팅하면 된다.
  • 이걸 사용할 경우에는 resetOffsets 를 true 로 설정하면 안됨.
public interface KafkaBindingRebalanceListener {

    /**
     * Invoked by the container before any pending offsets are committed.
     * @param bindingName the name of the binding.
     * @param consumer the consumer.
     * @param partitions the partitions.
     */
    default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
            Collection<TopicPartition> partitions) {

    }

    /**
     * Invoked by the container after any pending offsets are committed.
     * @param bindingName the name of the binding.
     * @param consumer the consumer.
     * @param partitions the partitions.
     */
    default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {

    }

    /**
     * Invoked when partitions are initially assigned or after a rebalance.
     * Applications might only want to perform seek operations on an initial assignment.
     * @param bindingName the name of the binding.
     * @param consumer the consumer.
     * @param partitions the partitions.
     * @param initial true if this is the initial assignment.
     */
    default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
            boolean initial) {

    }

}

 

 

https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-binder/reset-offsets.html

'Spring > Spring Cloud Stream' 카테고리의 다른 글

Binder Abstraction  (0) 2024.01.16
Error Handling  (0) 2024.01.12
Producing and Consuming Messages: Spring Cloud Stream  (0) 2024.01.12

+ Recent posts