이 글은 [네이버 D2 - Kafka Producer Client] 를 보고 작성하였습니다.
(+ 추가 이 글은 Kafka Client 0.10.2.1 버전 기준입니다.)

1. KafkaProducer Client 의 기본 구성 요소

카프카 Producer 는 크게 3가지 부분으로 나눠진다:

    1. KafkaProducer:
    • 사용자가 Kafka Producer 를 직접 사용하는 부분이다. send() 메소드를 통해서 메시지를 브로커에게 보낼 수 있다.
    1. RecordAccumulator:
    • Kafka Producer 가 send 를 하더라도 바로 브로커에게 전송되는 것이 아니라 RecordAccumulator 에 메시지가 축적된다. 메시지를 건건히 보내는게 아니라 Batch 식으로 보내기 위해서 존재한다.
    • 그리고 메시지는 기본적으로 비동기적으로 보내진다.
    1. Sender:
    • 실제로 브로커에 메시지를 보내는 부분으로 Sender Thread 라는 별도의 스레드를 통해서 보내진다.
    • Sender Thread 는 RecordAccumulator 에 축적된 메시지를 가지고와서 브로커에게 전송한다. 그리고 브로커에게 응답을 받으면 별도의 콜백 함수가 있을 경우에 콜백을 실행한다.
    • 실제 사용자에게는 Future 객체를 반환한다.

 

2. KafkaProducer send() 동작 파악하기

Kafka Producer 에서 send() 를 호출하면 다음 과정들이 일어난다:

  • 데이터는 네트워크에 보내기 위해 Serialization 과정이 일어난다.
  • 어떤 파티션에게 보내야하는지 결정하기 위해서 Partitioning 과정이 일어난다.
  • 네트워크를 통해 데이터를 보낼 때 더 효율적으로 보내기 위해 Compression 과정이 일어난다.
  • 최종적으로 RecordAccumulator 에 메시지가 축적되고 여기서 데이터를 개별적으로 보내지 않고 효율적으로 보내기 위해 배치식으로 모아서 전송한다.

2.1 Serialization

사용자가 전달하는 Record (= 브로커에게 보내는 메시지) 는 사용자가 지정한 직렬확 객체에 의해 Byte Array 로 변환된다.

 

Serializer는 key.serializer, value.serializer 설정값으로 지정하거나, KafkaProducer 생성 시 지정할 수 있다.

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for (int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

 

아니면 다음과 같이 send 를 할 때 지정할 수도 있다.

 Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

 

2.2 Partitioning

Kafka의 Topic은 여러 개의 Partition으로 나뉘어 있는데, 사용자의 Record는 지정된 Partitioner에 의해서 어떤 파티션으로 보내질지 정해진다.

 

Partitioner는 기본적으로 Record를 받아서 Partition Number를 반환하는 역할을 한다.

 

partitioner.class를 설정하여 Partitioner를 지정할 수 있으며, Partitioner를 지정하지 않으면org.apache.kafka.clients.producer.internals.DefaultPartitioner 가 사용된다.

 

Record 생성 시 Partition 지정이 가능하기 때문에, Partition이 지정되어 있는 경우에는 Partitioner를 사용하지 않고 지정된 Partition이 사용된다. Record에 지정된 Partition이 없는 경우 DefaultPartitioner는 다음과 같이 동작한다.

  • Key 값이 있는 경우 Key 값의 Hash 값을 이용해서 Partition을 할당한다.
  • Key 값이 없는 경우 Round-Robin 방식으로 Partition이 할당된다.

 

2.3 Compression

사용자가 전송하려는 Record는 압축을 함으로써 네트워크 전송 비용도 줄일 수 있고 저장 비용도 줄일 수 있다.

 

Record는 RecordAccumulator에 저장될 때 바로 압축되어 저장된다.

 

compression.type을 설정하여 압축 시 사용할 코덱을 지정할 수 있다. 다음과 같은 코덱를 사용할 수 있으며 지정하지 않는 경우 기본값은 none이다.

  • gzip
  • snappy
  • lz4

 

3. RecordAccumulator append()

사용자가 전송하려는 Record는 전송 전에 먼저 RecordAccumulator에 저장된다.

 

RecordAccumulator는 batches라는 Map을 가지고 있는데, 이 Map의 Key는 TopicPartition이고, Value는 Deque<RecordBatch>이다. 즉 TopicPartition 별로 메시지를 보낼 배치를 만든다.

 

RecordAccumulator에 저장하기 전에 Record의 사이즈를 검사한다. 사이즈가 max.request.size 또는 buffer.memory 값보다 크다면 RecordTooLargeException 이 발생하게 된다. 크기가 문제 없으면, RecordAccumulator의 append()를 이용해서 저장한다.

 

저장될 떈 Record 가 현재 RecordBatch 에 같이 보낼 수 있는지 검사한다. Record 가 사이즈가 작아서 현재 배치에 포함시켜 보낼 수 있다면 이를 추가하고, 포함시킬 수 없다면 새로운 RecordBatch 를 생성하고 여기에 Record 를 담는다.

 

이런 작동은 마지막 RecordBatch 가 Record 를 담을 수 있는지 적합한지 검사를 하고, 이 순서대로 메시지를 보내야하기 때문에 Queue 보다는 Deque 자료구조를 사용한다.

 

새로운 RecordBatch 를 만들 때는 Byte Buffer 을 할당받기 위해서 Buffer Pool 을 이용한다. 이 사이즈는 buffer.memory 에 의해서 결정된다. 

 

만약 버퍼 풀에 Byte Buffer 가 충분하지 않다면 RecordBatch 의 생성은 블로킹 된다. 기다려야하는 시간이 max.block.ms 보다 길어진다면 TimeoutException 이 발생한다.

 

RecordBatch 의 사이즈는 batch.size 설정 값과 압축된 Record 의 사이즈 중 큰 값으로 결정된다. 일반적으로는 Record 가 batch.size 보다 작으므로 하나의 RecordBatch 에 여러 개의 Record 가 담기지만 Record 가 batch.size 보다 크면 하나의 RecordBatch 에 하나의 Record 만 저장된다.

 

4. Sender Thread

Sender Thread는 RecordAccumulator 에 저장된 Record를 꺼내서 Broker 로 전송하고 Broker 의 응답을 처리한다.

 

RecordBatch 메시지는 브로커 별로 분류해서 보내야한다. 즉 브로커마다 여러개의 토픽 파티션들을 관리할텐데 한번에 Sender 에서 브로커로 전송할 때 여러 토픽 파티션들에 걸친 RecordBatch 메시지들 List 로 모아서 전송을 하는 것이다. 그리고 브로커에 한번에 보내는 사이즈는 max.request.size 만큼 모운다.

 

Sender 에서 브로커들로의 전송은 여러개의 스레드를 쓰는게 아니라 하나의 Sender Thread 에서 Selector 를 이용해서 여러 소켓을 관리하고 데이터를 보낸다. 

브로커로 보낼 메시지들을 정리했다면 이것들을 max.in.flight.requests.per.connection 값에 따라서 한번에 요청을 보낸다.

 

참고로, 요청이 실패할 경우 retries 설정값이 1 이상인 경우 재시도하기 때문에 max.in.flight.requests.per.connection 값이 1보다 크면 순서가 바뀔 수 있다.

 

그러나 이 경우에는 enable.idempotence 값을 true 로 주면 max.in.flight.requests.per.connection 값을 5로 설정해도 (= 기본값) 브로커 단에서 메시지의 순서를 보장해준다.

 

References:

+ Recent posts