메시지를 정확히 한 번 처리하는 것이 큰 의미가 없다고 생각할 수 있다.

 

메시지 전달의 신뢰성을 'at-least once (최소 한 번)' 으로 해놓고 중복된 메시지는 고유한 식별자로 분리해서 처리하면 되니까. 

 

그러나 스트림 어플리케이션이 복잡해지면서 '정확히 한 번: exactly once' 처리하는 것은 분명 도움이 된다.

 

예시로 들면 평균을 계산해야하는 스트림 어플리케이션을 생각해보자.

 

메시지를 중복 처리하지 않도록 하기 위해서 고유한 식별자를 메시지에 보내도, 이것을 저장하고 기억해놔야 중복 처리를 막을텐데 이런 것들은 상당히 귀찮을 수 있다. 

 

그러므로 카프카의  '정확히 한 번 처리' 에 대해 알아두는 것은 도움이 될 것이다. 

 

카프카에서 '정확히 한 번 처리' 는 두 가지 기능의 조합으로 이뤄진다. 하나씩 살펴보자.

  • 멱등성 프로듀서 (idempotent producer)
  • 트랜잭션

 

1. 멱등성 프로듀서

멱등성 프로듀서는 프로듀서 내부의 재시도 매커니즘으로 인해 메시지가 중복 발행 되는 것을 막아준다.

 

메시지 중복 발행이 발생하는 예시 케이스는 다음과 같다:

    1. 파티션 리더가 메시지를 수신하고 이를 In-Sync 레플리카에 복제한다.
    1. 프로듀서에게 성공 응답을 보내기 전에 파티션 리더를 가진 브로커는 크래시가 난다.
    1. 새로운 브로커가 파티션 리더가 되며, 프로듀서의 재전송 매커니즘으로 인해 메시지를 수신한다. 이로 인해 중복이 발생한다.

 

카프카에서 멱등성 프로듀서를 쓰면 이런 재시도를 해도 프로듀서에 의해 보낸 메시지가 추적되므로 중복이 발생되지 않는다.

 

멱등성 프로듀서의 작동 원리는 다음과 같다:

  • 먼저 멱등성 프로듀서는 일반 프로듀서와 달리 메시지에 프로듀서 ID 와 메시지의 순서 번호인 Sequence Number 를 포함시킨다.
    • 프로듀서를 시작하는 과정에서 프로듀서 ID 와 Sequence Number 를 받아온다.
    • 프로듀서는 브로커에 전달하는 요청 단위인 RecordBatch 에 프로듀서 ID 와 SequenceNumber 를 부여해서 전송한다. 
  • 브로커는 각 토픽 파티션마다 프로듀서가 보낸 메시지들을 추적한다. 추적하는 개수는 max.in.flights.requests.per.connection 값 만큼 설정된다. 기본값은 5이다. 즉 5개의 메시지는 중복으로 보내도 추적해서 중복을 제거할 수 있다는 뜻이다.
  • 메시지는 Sequence Number 순으로 와야할텐데 만약 이 순서에 맞지 않게 큰 메시지가 온다면 브로커는 out of order sequence number 에러를 반환한다.

 

중복 메시지가 필터링 되는 지표를 보고 싶다면 프로듀서 측에서는 record-error-rate 지표값을 보면 되고, 브로커에서는 RequestMetric 에서 ErrorPerSpec 지표를 보면 된다.

 

프로듀서가 실패하고 재시작되면 중복이 발생할까?

  • 프로듀서 ID 는 프로듀서가 시작할 때 브로커에게 할당 받는다. 이때 고정적인 값을 할당 받는게 아니라 매번 새로운 값을 할당 받으므로 메시지 중복이 생길 수 있는 시나리오는 존재한다:
      1. 프로듀서가 메시지를 보내고 브로커에 성공 응답을 받기 전에 죽음. 그래서 클라이언트에게 결과를 전달해주지 못한다.
      1. 클라이언트는 응답이 오지 않아 새로운 프로듀서에게 전송하고 프로듀서는 다시 메시지를 보낸다. 그리고 브로커에 성공 응답을 받는다. 이 경우에서 브로커는 중복 제거를 하지 못한다. 왜냐하면 프로듀서 ID 가 달라서.
    • 즉 멱등성 프로듀서는 하나의 세션 범위에서만 정확히 한 번 전달을 보장한다.
  • 그러나 이런 문제는 프로듀서에서 transactional.id 를 사용하는 트랜잭션 프로듀서를 쓰게 되는 경우에는 문제가 없긴하다. 이 경우에는 브로커가 내가 설정한 transactional.id 와 매핑된 고정된 프로듀서 ID 를 할당해주기 때문이다. 

 

브로커가 장애가 나면 중복이 발생할까?

  • 리더 파티션을 가지는 브로커가 죽더라도 In-Sync 레플리카를 가진 브로커는 최신 상태를 보존하고, 메시지 추적도 동일한 상태이므로 걱정할 필요 없다.

 

멱등적 프로듀서의 한계점:

  • 멱등적 프로듀서를 쓸 때 중요한 건 실패했다고 임의로 producer.send() 를 여러번 호출해서 재시도하면 안된다는 것이다. 실패해도 괜찮다. 내부적인 매커니즘으로 인해 재시도를 하는데 이를 믿으면 된다. 오히려 producer.send() 를 여러번 호출하면 이게 의도적인건지, 재시도 목적인건지 카프카 브로커는 알지 못하므로 증복이 발생할 수 있다.

멱등적 프로듀서의 사용법:

  • enable.idempotence=true 를 설정하면 된다.

 

2. 트랜잭션

카프카에서 트랜잭션 처리 정의와 범위:

  • 카프카에서의 트랜잭션은 Atomic Multipartition Write (= 여러 파티션 동시 쓰기) 를 통해 정확히 한 번 쓰는 것을 보장해준다. 즉 커밋을 하는 것과 여러 다른 토픽에 대해 쓰는 것에 대해 원자성을 보장한다. 
    • 커밋을 하는 것도 _consumer_offset 토픽에 쓰는 것이다.
  • 이 말은 트랜잭션 범위는 카프카 토픽 내에서만 이뤄진다는 뜻이다. 외부 데이터베이스나 시스템과 상호작용해서 트랜잭션 범위를 보장하는 것이 아니다.

 

카프카에서 트랜잭션 처리에 대한 오해:

  • 카프카 트랜잭션은 카프카 토픽 내에서만 트랜잭션 처리를 보장해주므로 만약 컨슈머에서 메시지를 읽고나서 이에 대한 어플리케이션 상태를 변경시키는 Side Effect 가 있다면 이런 상태는 정확히 한 번 처리되지 않을 것이다.
  • 카프카 트랜잭션은 카프카 스트림즈와 같은 모든 데이터를 카프카 토픽에서 보관하고 처리하는 어플리케이션에서 특히 유용할 것이다.

 

카프카 트랜잭션이 처리하는 문제들:

  • 어플리케이션이 크래시가 나서 커밋 작업과 토픽 메시지를 발행하는 작업을 원자적으로 완료하지 못하는 경우:
    • 오프셋을 커밋하는 것과 다음 토픽에 메시지를 발행하는 작업 중 하나만 작업이 완료된 상태에서 어플리케이션이 죽는다면 이후 다시 재개할 때 메시지를 중복하거나 유실될 수 있다. 카프카 트랜잭션은 이런 처리를 원자적으로 처리하기 떄문에 정확히 한 번 처리할 수 있다.
  • 죽은 줄 알았던 컨슈머 어플리케이션이 살아있는 경우에 중복 처리가 생길 수 있다:
    • 컨슈머는 주기적으로 메시지를 가지고 가지 않거나 하트비트를 브로커에 보내지 않으면 파티션 리밸런싱이 발생한다. 리밸런싱 이후 새로운 컨슈머가 메시지를 처리하고 커밋하고 다음 토픽에 메시지를 발행해야하는데, 기존 컨슈머가 생존해있어서 이 작업을 같이 진행한다면 중복 메시지 처리가 발생할 수 있다. 이 경우에 카프카 트랜잭션은 Epoch 넘버를 이용해서 기존 어플리케이션의 요청은 무시하도록 해서 이런 좀비 문제를 해결한다.

 

카프카 트랜잭션 프로듀서를 매번 생성할 때 발생하는 문제:

  • 카프카 트랜잭션 프로듀서가 보낸 메시지는 트랜잭션 프로듀서가 종료되고 나서도 transaction.id.expiration.ms 시간동안 메모리에 유지된다. 그러므로 프로듀서를 재사용하지 않고 매번 생성해서 메시지를 보내게 되면 메모리 누수와 같은 문제가 발생할 수 있다. 그러므로 가능한 프로듀서를 재사용하는 것을 권장하고, Serverless 로 사용해서 프로듀서를 재사용할 수 없다면 transaction.id.expiration.ms 이 시간을 가능한 짧게 잡아두는 것이 좋다.

 

카프카에서 Hanging transaction 이 발생하는 문제:

  • 극히 드문 케이스이긴 하지만 카프카의 트랜잭션이 멈춰서 LSO (the last stable offset) 즉 마지막 트랜잭션 커밋 메시지 오프셋이 증가하지 않는 문제가 발생할 수 있다. 이렇게 멈춘 트랜잭션을 Hanging transaction 이라고 한다.
  • 이런 문제가 발생하면 컨슈머도 장기간 대기하게 되고 결국 log.retention.ms 시간이 지나서 파티션 메시지가 모두 삭제될 것이다. 이 문제를 해결하려면 토픽 동적 설정 중 retention.ms 를 짧게 줘서 해당 메시지들을 빨리 비우도록 하는 것이 방법이다. 카프카 3.4 버전부터는 이 문제를 해결하기 위한 개선 방안이 나오고 있으니 시간이 지나면 해결될 수 있다.
  • 이런 문제가 발생했다는 지표는 토픽 파티션 별로 나오는 지표 중에 LastStableOffsetLag 지표를 확인해야한다. 이 지표는 LSO 와 최신 토픽 파티션 오프셋의 격차를 보여준다. 이 지표가 계속해서 증가한다면 Hanging Transaction 이 생겼다는 의미다.

 

트랜잭션으로 인한 성능 문제가 발생한다:

  • 트랜잭션 커밋 요청은 동기식으로 진행되므로 성능적인 오버헤드가 발생한다. 이 과정에서 트랜잭션 코디네이터는 여러 토픽 파티션에 커밋 마커를 남길 것이고, 트랜잭션 로그 상태를 변경하는 작업을 수행할 것이다. 
  • 컨슈머 입장에서도 커밋된 메시지들만 필터링을 해야하니까 오버헤드가 생긴다.

 

2.1 카프카 트랜잭션의 작동 원리:

  • 트랜잭션은 브로커 내부에 있는 Transaction Coordinator 에 의해서 상태가 관리된다.
    • 트랜잭션 상태는 총 3가지가 있다. Ongoing -> Prepare commit -> Completed 이렇게 있으며 순서대로 전환된다.
    • 이 상태는 trasnaction log 라는 토픽에서 관리가 된다. 이 토픽을 관리할 수 있는 것은 Transaction Coordinator 뿐이며, 이것도 다른 토픽 파티션과 마찬가지로 복제가 된다.
    • 프로듀서와 상호 작용하는 Transaction Log 의 리더 파티션은 transaction.id 값을 해시 함수를 적용해서 나온 값으로 정해진다. 
  • 구체적으로 동작하는 과정은 크게 A -> B -> C -> D 순으로 이뤄진다:
    • A: the producer and transaction coordinator interaction
      • 프로듀서는 트랜잭션 기능을 사용하기 위해서 initTransactions API 를 호출해서 프로듀서는 자신의 transactional.id 를 트랜잭션을 Coordinator 에 등록시킨다. 이런 과정에서 프로듀서는 트랜잭션 프로듀서로 업그레이드 시키게 된다.
      • 그리고 이 시점에서 이전에 완료하지 못했던 트랜잭션 처리 작업이 있다면 마무리 시킨다.
      • 트랜잭션 프로듀서로 업그레이드 시킬 때 고정적인 producer.id 를 받게 되고 그와 매핑된 Epoch Number 를 증가시킨다. producer.id 를 통해서 트랜잭션을 시작하는 프로듀서를 유일하게 구별시켜주고, Epoch Number 를 통해서 좀비 트랜잭션 프로듀서를 방지하게 해준다. Epoch Number 가 낮은 프로듀서의 메시지는 요청을 받지 않는 식으로.
    • B: the coordinator and transaction log interaction
      • 프로듀서는 트랜잭션을 시작할 때 이를 Coordinator 에게 알려준다. Transaction Coordinator 는 transaction log 의 트랜잭션 상태를 ongoing 으로 변경시킨다.
      • 트랜잭션은 최대 transaction.timeout.ms 이 시간동안 진행중인 상태가 될 수 있다. 기본값은 15분임.
      • 트랜잭션 코디네이터만이 유일하게 transaction log 를 변경시킬 수 있고, 이 또한 여러 파티션으로 안전하게 복제되니까 신뢰성있게 처리할 수 있다.
    • C: the producer writing data to target topic-partitions
      • 이 과정은 일반 프로듀서가 토픽 파티션에 데이터를 보내는 것과 거의 동일하다. 다만 추가되는 점이 있는데 프로듀서가 좀비 프로듀서가 아닌지 검사하는 과정이 생긴다.
    • D: the coordinator to topic-partition interaction
      • 프로듀서가 모든 데이터를 쓰고 나서 commitTransaction 를 호출하면 Transaction Coordinator 는 Transaction Log 에 상태를 prepare commit 으로 변경시킨다. 일단 이 상태가 되면 트랜잭션은 완료될 수 있다.
      • Transaction Coordinator 는 그러고 나서 컨슈머가 트랜잭션 완료된 데이터를 읽어갈 수 있도록 토픽 파티션에 commit marker 를 남긴다.
      • 컨슈머는 isolation.level 을 read_committed 으로 설정하게 되면 커밋된 데이터나 트랜잭션에 참여하지 않는 데이터만 읽어갈 수 있도록 만들 수 있다.
      • Commit Marker 를 모두 남기게 되면 Transaciton log 에 롼료 상태인 Complted 를 기록하고 마무리한다.
      • 즉 모든 메시지를 파티션에 기록했다는 단계와, 모든 메시지에 커밋 마커를 남겼다는 단계 이렇게 두 개의 단게가 있는 것이다.

 

2.2 트랜잭션 사용법:

Kafka Github 의 예제 코드를 보면 어떻게 트랜잭션을 사용하는 지 볼 수 있다:

코드의 주요 조각을 보면서 어떻게 사용하는지 보자.

 

먼저 프로듀서 설정 부분이다.

public KafkaProducer<Integer, String> createKafkaProducer() {
    Properties props = new Properties();
    // bootstrap server config is required for producer to connect to brokers
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    // client id is not required, but it's good to track the source of requests beyond just ip/port
    // by allowing a logical application name to be included in server-side request logging
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
    // key and value are just byte arrays, so we need to set appropriate serializers
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    if (transactionTimeoutMs > 0) {
        // max time before the transaction coordinator proactively aborts the ongoing transaction
        props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
    }
    if (transactionalId != null) {
        // the transactional id must be static and unique
        // it is used to identify the same producer instance across process restarts
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
    }
    // enable duplicates protection at the partition level
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
    return new KafkaProducer<>(props);
}
  • props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); 를 통해서 transaction.id 를 등록해야 트랜잭션 프로듀서를 사용할 수 있다.
  • props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); 가능한 transactionTimeoutMs 를 짧게 잡아줘서 컨슈머가 지연되지 않도록 하는 것을 추천한다. 예제에서는 10초를 씀.

 

다음으로는 컨슈머 설정 부분을 보자.

public KafkaConsumer<Integer, String> createKafkaConsumer() {
    Properties props = new Properties();
    // bootstrap server config is required for consumer to connect to brokers
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    // client id is not required, but it's good to track the source of requests beyond just ip/port
    // by allowing a logical application name to be included in server-side request logging
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID());
    // consumer group id is required when we use subscribe(topics) for group management
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    // sets static membership to improve availability (e.g. rolling restart)
    instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id));
    // disables auto commit when EOS is enabled, because offsets are committed with the transaction
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, readCommitted ? "false" : "true");
    // key and value are just byte arrays, so we need to set appropriate deserializers
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    if (readCommitted) {
        // skips ongoing and aborted transactions
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    }
    // sets the reset offset policy in case of invalid or no offset
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return new KafkaConsumer<>(props);
}
  • props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, readCommitted ? "false" : "true"); 트랜잭션 프로듀서를 통해서 커밋을 할 거니 자동 커밋은 끈다. 그리고 트랜잭션 커밋된 데이터만 읽도록 설정한다.

 

이제 트랜잭션을 사용하는 코드 부분을 보자: 

public void run() {
    int processedRecords = 0;
    long remainingRecords = Long.MAX_VALUE;
    // it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
    int transactionTimeoutMs = 10_000;
    // consumer must be in read_committed mode, which means it won't be able to read uncommitted data
    boolean readCommitted = true;
    try (KafkaProducer<Integer, String> producer = new Producer("processor-producer", bootstrapServers, outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer();

    KafkaConsumer<Integer, String> consumer = new Consumer("processor-consumer", bootstrapServers, inputTopic, "processor-group", Optional.of(groupInstanceId), readCommitted, -1, null).createKafkaConsumer()) {

        // called first and once to fence zombies and abort any pending transaction
        producer.initTransactions();

        consumer.subscribe(singleton(inputTopic), this);

        Utils.printOut("Processing new records");
        while (!closed && remainingRecords > 0) {
            try {
                ConsumerRecords<Integer, String> records = consumer.poll(ofMillis(200));
                if (!records.isEmpty()) {
                    // begin a new transaction session
                    producer.beginTransaction();

                    for (ConsumerRecord<Integer, String> record : records) {
                        // process the record and send downstream
                        ProducerRecord<Integer, String> newRecord = new ProducerRecord<>(outputTopic, record.key(), record.value() + "-ok");
                            producer.send(newRecord);
                        }

                        // checkpoint the progress by sending offsets to group coordinator broker
                        // note that this API is only available for broker >= 2.5
                        producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata());

                        // commit the transaction including offsets
                        producer.commitTransaction();
                        processedRecords += records.count();
                    }
            } catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
                         | FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e) {
                // we can't recover from these exceptions
                Utils.printErr(e.getMessage());
                shutdown();
            } catch (OffsetOutOfRangeException | NoOffsetForPartitionException e) {
                // invalid or no offset found without auto.reset.policy
                Utils.printOut("Invalid or no offset found, using latest");
                consumer.seekToEnd(emptyList());
                consumer.commitSync();
            } catch (KafkaException e) {
                // abort the transaction and try to continue
                Utils.printOut("Aborting transaction: %s", e);
                producer.abortTransaction();
            }
            remainingRecords = getRemainingRecords(consumer);
            if (remainingRecords != Long.MAX_VALUE) {
                Utils.printOut("Remaining records: %d", remainingRecords);
            }
        }
    } catch (Throwable e) {
        Utils.printOut("Unhandled exception");
        e.printStackTrace();
    }
    Utils.printOut("Processed %d records", processedRecords);
    shutdown();
}
  • producer.initTransactions(); 를 통해서 트랜잭션 프로듀서로 업그레이드 시킨다.
  • consumer.subscribe(singleton(inputTopic), this); 를 통해서 데이터를 수신할 토픽 파티션을 정한다.
  • producer.beginTransaction(); 를 통해서 트랜잭션 처리를 시작한다.
  • producer.send(newRecord); 를 통해서 다음 스트림 처리를 위해 토픽 파티션에 메시지를 전송한다.
  • producer.sendOffsetsToTransaction(getOffsetsToCommit(consumer), consumer.groupMetadata()); 를 통해서 오프셋을 커밋한다.
  • producer.commitTransaction(); 를 통해서 트랜잭션을 커밋한다.
  • ProducerFencedException FencedInstanceIdException 이 발생한다면 좀비 트랜잭션 프로듀서가 생긴 것 때문이다.
  • 만약 처리하다가 예외가 발생한다면 producer.abortTransaction(); 를 통해서 트랜잭션 처리를 롤백시키는게 중요하다.

 

References:

 

Transactions in Apache Kafka | Confluent

Learn the main concepts needed to use the transaction API in Apache Kafka effectively.

www.confluent.io

'Apache Kafka' 카테고리의 다른 글

Kafka Network Client Internals  (0) 2023.12.31
Apache Kafka Internal Mechanism  (0) 2023.12.30
Kafka Consumer 주요 설정  (0) 2023.12.29
Kafka 에서 신뢰성을 주는 방법  (0) 2023.12.28
Kafka Consumer 에서 커밋을 하는 방법  (0) 2023.12.27

+ Recent posts