Develop

Kafka Producer 내부 동작 — partition 줄세우기와 broker 단위 flush

spring-kafka 3.3.7 / kafka-clients 3.7.1 기준 내부 동작 정리.전체 구조 — 2-stage systemsend() 한 줄 호출 = wire send 아님. 호출 스레드는 buffer에 append만 하고 반환, 별도 단일 Sender 스레드가 broker별로 묶어서 wire send.wire send: 실제 socket에 byte를 write해서 broker로 TCP 패킷을 전송하는 동작. JVM 메모리 buffer/queue에 적재하는 것과 구분. Stage 1Stage 2Stage 3 (ACK)책임partition별 줄세우기broker별 묶어서 wire sendcallback fanout + buffer 반납스레드호출 스레드 N개 (thread-safe)Sender 단..

Kafka Producer 내부 동작 — partition 줄세우기와 broker 단위 flush

728x90

spring-kafka 3.3.7 / kafka-clients 3.7.1 기준 내부 동작 정리.


전체 구조 — 2-stage system

send() 한 줄 호출 = wire send 아님. 호출 스레드는 buffer에 append만 하고 반환, 별도 단일 Sender 스레드가 broker별로 묶어서 wire send.

wire send: 실제 socket에 byte를 write해서 broker로 TCP 패킷을 전송하는 동작. JVM 메모리 buffer/queue에 적재하는 것과 구분.

  Stage 1 Stage 2 Stage 3 (ACK)
책임 partition별 줄세우기 broker별 묶어서 wire send callback fanout + buffer 반납
스레드 호출 스레드 N개 (thread-safe) Sender 단일 스레드 Sender 단일 스레드
응답 시간 µs 단위 (즉시 return) linger.ms / batch.size 트리거 ACK 도착 시

thread 분포

[Application]
  thread-1 ─┐
  thread-2 ─┤ ─→ send() ─→ Accumulator append (multi-thread safe)
  thread-3 ─┤
  thread-4 ─┘

[Producer 내부]
  Sender thread (= NIO thread) × 1
       │
       └─ NIO Selector ─→ broker-1 socket
                       ─→ broker-2 socket
                       ─→ broker-3 socket
                       ─→ ... (broker N대)
  • Application thread 수 = 애플리케이션이 정함 (N개 동시 send 가능)
  • Sender thread 수 = producer 인스턴스당 1개 고정
  • broker 수 = 무관 — multiplex라 thread 추가 없음

Stage 1 — partition별 줄세우기

자료구조

RecordAccumulator
└─ Map<topic, TopicInfo>
   └─ Map<partition, Deque<ProducerBatch>>
      └─ ProducerBatch (ByteBuffer borrowed from BufferPool)
         └─ records (압축된 byte stream)

BufferPool
├─ free: Deque<ByteBuffer 64KB>   ← 재사용 풀
├─ nonPooledAvailableMemory       ← 신규 할당용 잔량
└─ waiters: Deque<Condition>      ← 메모리 부족 시 대기

record 진입 흐름

batch가 send 대상이 되는 6조건

조건 트리거 의미
full batch.size(64KB) 임박 / deque에 batch ≥ 2개 가득 참 — 즉시 보낼 준비
expired waited ≥ linger.ms 시간 초과 — fallback trigger
exhausted BufferPool 대기자 있음 메모리 압박 — 빨리 비워야 함
closed producer 종료 절차 잔여 record drain
flushInProgress flush() 호출됨 강제 drain
transactionCompleting tx commit/abort tx 종료 전 flush

→ 평상시 트리거는 batch.size 또는 linger.ms 둘 중 먼저 만족된 쪽.

deque에 batch가 2개 이상 쌓이는 이유

시나리오 결과
현재 batch full → 새 batch 시작 이전 batch는 deque tail에서 대기 → full=true
linger.ms 도달했으나 max.in.flight 가득 drain 못 함 → 누적
Sender 처리 지연 (broker 느림) retry 또는 backlog 누적

Deque인 이유 (Queue 아닌)

동작 메서드 트리거
정상 append (호출 스레드) addLast 새 batch 시작
정상 drain (Sender 스레드) pollFirst 보낼 때
retry 시 복귀 addFirst 실패 batch가 head로 → 순서 유지

→ retry 순서 보존을 위해 양쪽 끝 다 필요 → Deque.

BufferPool 재사용 패턴

Allocate (메모리 빌리기) — 새 ProducerBatch 만들 때

Deallocate (메모리 반납) — batch ACK 후 또는 종료 시

  • batch.size(64KB)와 동일한 ByteBuffer는 clear()만 하고 free 풀로 복귀 → GC 부담 없음
  • 메모리 부족으로 allocate가 await 중이라면 deallocate 끝에 Condition.signal() 호출 → 자던 thread가 깨어나 다시 시도

Stage 2 — broker별 묶어서 wire send

partition → broker 매핑

partition 0  ─┐
partition 7  ─┼─→ broker-1 (leader)
partition 14 ─┘
partition 5  ─┐
partition 12 ─┼─→ broker-2 (leader)
partition 28 ─┘
...

각 partition의 leader broker는 cluster metadata가 결정. partition은 여러 broker에 분산, 한 broker는 여러 partition leader.

drain — broker별 collation

drain: per-partition deque에서 ready 상태인 ProducerBatch들을 꺼내 broker별로 묶는 작업.

  • batch(p=N) = partition N의 deque에 ready 상태로 있는 ProducerBatch 1개 (식별자에 partition 번호만 부여)
  • 예: batch(p=0)batch(p=9)가 모두 broker-1로 묶이는 이유 = broker-1이 partition 0과 partition 9 둘 다의 leader이기 때문
  • partition별로 시점상 정확히 1개 batch만 ready인 단순 사례 (실제로는 deque에 여러 batch 누적 가능)

ProduceRequest 구조

drain 결과(Map<broker, List<batch>>)에서 broker별 List를 ProduceRequest로 빌드. 한 ProduceRequest 안에는 같은 broker가 leader인 모든 partition batch가 들어가고, topic이 달라도 같은 broker로 가는 거면 같이 묶임.

ProduceRequest → broker-X
├─ acks
├─ timeout
└─ TopicProduceDataCollection           ← topic 별 N개 entry 가능
   ├─ topic A
   │  ├─ partition 0  → batch(p=0)
   │  └─ partition 7  → batch(p=7)
   ├─ topic B                            ← 같은 broker가 leader인 다른 topic도 함께
   │  ├─ partition 3  → batch(p=3)
   │  └─ partition 11 → batch(p=11)
   └─ ...

KafkaTemplate 여러 개 쓸 때의 collation

Collation이 일어나는 단위 = KafkaProducer 인스턴스. KafkaTemplate 여러 개 만들어 topic별로 분리해도, 그들이 같은 ProducerFactory를 공유하면 underlying producer가 같아서 collation이 그대로 작동함.

  같은 factory 공유 (기본) 다른 factory 분리
KafkaProducer 인스턴스 1 N
Accumulator 1 (topic A+B partition 공존) N (각자 격리)
Sender thread 1 N
broker당 socket 1 N
broker당 ProduceRequest 1 (multi-topic 묶임) N (각자 따로)
batching 효율 ↑ (cross-topic)
운영 격리
ack / compression 등 config 동일 토픽별 다르게 가능

분리 필요: 토픽별 다른 cluster / 다른 ack / 다른 transactional 정책.
공유 OK: 같은 cluster + 같은 ack/compression (대부분의 케이스).

Sender 스레드의 한 사이클

NIO Selector — 1 thread × N broker

Sender thread는 단일이지만 NIO Selector로 broker N대의 socket을 동시 다룬다.

multiplex 특성

특성 의미
non-blocking I/O socket read/write가 즉시 return (block 안 함)
multiplexing 1 thread가 N socket 동시 처리 — broker 늘어도 thread 추가 없음
partial I/O 추적 한 번에 다 못 보내면 다음 poll에서 이어서
wakeup self-pipe로 OS-level block 강제 해제

wakeup 메커니즘 — Sender thread는 보통 selector.poll(linger.ms)에서 자고있음. 깨우는 트리거:

트리거 위치 결과
batch full 호출 스레드 append() Selector self-pipe 1 byte write
linger.ms 만료 Selector timeout poll 자연 종료
flush() 호출 스레드 wakeup
close() 종료 절차 wakeup

Stage 3 — ACK 처리

callback chain

1 wire ACK = N record callback 일제 트리거 (같은 batch).

Future가 latch를 공유하는 구조

ProducerBatch
├─ ProduceRequestResult (CountDownLatch(1))
└─ records[0..N]
   ├─ FutureRecordMetadata { relativeOffset=0, result_ref → ProduceRequestResult }
   ├─ FutureRecordMetadata { relativeOffset=1, result_ref → 동일 }
   ├─ ...
   └─ FutureRecordMetadata { relativeOffset=N, result_ref → 동일 }

→ batch ACK 시 latch.countDown() 1번 → N개 future 일제 해제.

BufferPool 반납 트리거

케이스 호출 시점 반납 방식
성공 ACK completeBatch clear + free 풀로
종결 실패 (retry 한도 초과) failBatch 동일
Batch split (MESSAGE_TOO_LARGE) 분할 후 원본 deallocate 동일
Producer 종료 abortIncompleteBatches 동일
retry 시 (해당 없음) 반납 안 함 — 같은 ByteBuffer 들고 재시도

Exception 분류

분류 예시 처리
ApiException SerializationException, RecordTooLargeException, AuthorizationException callback 동기 호출 + FutureFailure (throw 안 함)
InterruptedException 스레드 인터럽트 InterruptException throw
KafkaException / 기타 시스템 에러 throw

→ ApiException은 whenComplete로 잡힘. 그 외는 try/catch 필요.


운영 설정 reference

producerPerThread 비교

  shared (false, 기본) per-thread (true)
Producer 인스턴스 수 1 thread 수
Sender thread 수 1 thread 수
BufferPool 64MB × 1 64MB × N
broker 연결 수 broker 수 thread × broker
append lock per-partition deque (낮음) 없음
batch 크기 큼 (압축률↑) 작음 (압축률↓)
ProduceRequest 수 적음 N배
권장 거의 모든 케이스 단일 Sender saturate 극단 케이스만

timeout 4종 관계

Config 기본값 적용 단위 초과 시
max.block.ms 60s send() 1회 (buffer/metadata 대기) send() 자체에서 throw
request.timeout.ms 30s ProduceRequest 1회 round-trip 해당 request fail → retry
retry.backoff.ms 100ms (max 1s) retry 사이 대기 (exp)
delivery.timeout.ms 120s send 시점부터 최종 결과까지 callback에 TimeoutException

필수 조건: delivery.timeout.ms ≥ linger.ms + request.timeout.ms

권장 config

kafka:
  producer:
    retries: 2147483647            # default
    retry-backoff-ms: 100          # default (exp backoff)
    delivery-timeout-ms: 120000    # default 2분
    request-timeout-ms: 30000      # default 30s
    max-block-ms: 60000            # default 60s
    enable-idempotence: true       # ★ retry 시 ordering + dedup
    acks: all                      # ★ ISR commit까지 확인
    compression-type: zstd
    linger-ms: 50
    batch-size: 65536              # 64KB
    buffer-memory: 67108864        # 64MB
    max-in-flight: 5

idempotence가 보호하는 범위

  보호됨 보호 안 됨
같은 producer 인스턴스 + 같은 partition + 네트워크 retry ✅ broker가 PID/epoch/sequence로 dedup
Producer 재시작 후 같은 비즈니스 이벤트 재발행 ❌ 새 PID라 broker가 다른 record로 봄
Consumer 재처리 (rebalance, crash) ❌ broker는 전달했음
다른 producer의 같은 logId 발행

비즈니스 idempotency (logId 기준 dedup)는 컨슈머 측 책임.

max.in.flight + idempotence 조합

max.in.flight idempotence guaranteeMessageOrder Ordering 보장 Throughput
1 false true (내부 derived) ✅ (mute 메커니즘) ⬇⬇
1 true true ⬇⬇
5+ (기본) false false ❌ retry 시 깨질 위험 ⬆⬆
5+ (Kafka 3.0+ default) true false ✅ (broker sequence 검증) ⬆⬆

실패 처리 패턴 비교

패턴 매커니즘 손실 보장 복잡도
Fire-and-forget callback에 로그 + metric 낮음
In-memory retry queue bounded queue + 백그라운드 retry ❌ (queue 만석 시 손실) 중간
Outbox DB tx에 event row + 별도 publisher 높음
DLQ topic 실패 이벤트를 별도 topic으로 △ (Kafka 죽으면 무력) 중간

broker 영구 장애 대비 진짜 보장은 outbox. in-memory queue는 transient 회복용 안전망.


빠른 참조 — 한 record의 여정


핵심 takeaway

직관 실제
send 1회 = wire send 1회 append 1회 + 별도 thread가 모아서 wire send
partition 32개 = wire request 32개 broker 3대면 wire request 3개 (collation)
broker 늘리면 thread 늘어야 NIO multiplex, 1 thread가 N broker 처리
send 콜백이 producer thread에서 실행 Sender thread에서 실행 (콜백 무거우면 전체 지연)
close() 호출하면 producer 닫힘 (spring-kafka 비트랜잭션) close()는 no-op, 싱글톤 유지
max.in.flight = 전체 in-flight 한도 broker 1대당 — broker N × max.in.flight 동시 가능
TimeoutException = 응답 없음 (callback에 도착하면) 이미 delivery.timeout 만큼 retry 진행됨

댓글

Comments