카프카 컨슈머 어플리케이션을 부팅할 때 시작하는 Offset 은 다음과 같은 기준으로 적용된다:
resetOffset
이 false 일 때:- 브로커 토픽 파티션에 저장된 offset 이 있다면 그걸 이용한다. 그러나 파티션에 저장된 이 오프셋이 만료된 경우라면
auto.offset.reset
값에 따라서 시작하는 offset 이 결정된다. 크게 두 가지 값인earliest
와lastest
값이 있음. - 파티션에 저장된 오프셋이 만료 되는 시간은
offsets.retention.minutes
값에 따라서 결정된다. 기본값은 7일임. 즉 7일이 지나면 사라진다.
- 브로커 토픽 파티션에 저장된 offset 이 있다면 그걸 이용한다. 그러나 파티션에 저장된 이 오프셋이 만료된 경우라면
resetOffset
이 true 일 때:startOffset
값에 따라서 결정된다. 크게earliest
와lastest
값이 있는데 각각에 맞는 유용한 시나리오가 있음.startOffset=earliest
인 경우:- Compaction 이 적용된 토픽에서 첫 값부터 읽어와서 상태를 만들고 싶은 경우.
startOffset=lastest
인 경우:- 컨슈머가 부팅된 시점부터 이벤트를 읽어오고 싶은 경우.
- Spring Cloud Stream 에서 설정을 적용할 땐 다음 레퍼런스를 참고해서 적용하면 된다.
초기 시작하는 Offset 을 좀 더 세밀하게 제어하고 싶은 경우에는 KafkaBindingRebalanceListener
를 이용해야한다:
onPartitionsAssigned
에서 초기 시작하는 커밋을 가져와서 세팅하면 된다.- 이걸 사용할 경우에는
resetOffsets
를 true 로 설정하면 안됨.
public interface KafkaBindingRebalanceListener {
/**
* Invoked by the container before any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
}
/**
* Invoked by the container after any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
/**
* Invoked when partitions are initially assigned or after a rebalance.
* Applications might only want to perform seek operations on an initial assignment.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
boolean initial) {
}
}
https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-binder/reset-offsets.html
'Spring > Spring Cloud Stream' 카테고리의 다른 글
Binder Abstraction (0) | 2024.01.16 |
---|---|
Error Handling (0) | 2024.01.12 |
Producing and Consuming Messages: Spring Cloud Stream (0) | 2024.01.12 |