Kafka Producer 내부 동작 — partition 줄세우기와 broker 단위 flush
spring-kafka 3.3.7 / kafka-clients 3.7.1 기준 내부 동작 정리.
전체 구조 — 2-stage system
send() 한 줄 호출 = wire send 아님. 호출 스레드는 buffer에 append만 하고 반환, 별도 단일 Sender 스레드가 broker별로 묶어서 wire send.

wire send: 실제 socket에 byte를 write해서 broker로 TCP 패킷을 전송하는 동작. JVM 메모리 buffer/queue에 적재하는 것과 구분.
| Stage 1 | Stage 2 | Stage 3 (ACK) | |
|---|---|---|---|
| 책임 | partition별 줄세우기 | broker별 묶어서 wire send | callback fanout + buffer 반납 |
| 스레드 | 호출 스레드 N개 (thread-safe) | Sender 단일 스레드 | Sender 단일 스레드 |
| 응답 시간 | µs 단위 (즉시 return) | linger.ms / batch.size 트리거 | ACK 도착 시 |
thread 분포
[Application]
thread-1 ─┐
thread-2 ─┤ ─→ send() ─→ Accumulator append (multi-thread safe)
thread-3 ─┤
thread-4 ─┘
[Producer 내부]
Sender thread (= NIO thread) × 1
│
└─ NIO Selector ─→ broker-1 socket
─→ broker-2 socket
─→ broker-3 socket
─→ ... (broker N대)
- Application thread 수 = 애플리케이션이 정함 (N개 동시 send 가능)
- Sender thread 수 = producer 인스턴스당 1개 고정
- broker 수 = 무관 — multiplex라 thread 추가 없음
Stage 1 — partition별 줄세우기
자료구조
RecordAccumulator
└─ Map<topic, TopicInfo>
└─ Map<partition, Deque<ProducerBatch>>
└─ ProducerBatch (ByteBuffer borrowed from BufferPool)
└─ records (압축된 byte stream)
BufferPool
├─ free: Deque<ByteBuffer 64KB> ← 재사용 풀
├─ nonPooledAvailableMemory ← 신규 할당용 잔량
└─ waiters: Deque<Condition> ← 메모리 부족 시 대기
record 진입 흐름

batch가 send 대상이 되는 6조건
| 조건 | 트리거 | 의미 |
|---|---|---|
| full | batch.size(64KB) 임박 / deque에 batch ≥ 2개 | 가득 참 — 즉시 보낼 준비 |
| expired | waited ≥ linger.ms | 시간 초과 — fallback trigger |
| exhausted | BufferPool 대기자 있음 | 메모리 압박 — 빨리 비워야 함 |
| closed | producer 종료 절차 | 잔여 record drain |
| flushInProgress | flush() 호출됨 | 강제 drain |
| transactionCompleting | tx commit/abort | tx 종료 전 flush |
→ 평상시 트리거는 batch.size 또는 linger.ms 둘 중 먼저 만족된 쪽.
deque에 batch가 2개 이상 쌓이는 이유
| 시나리오 | 결과 |
|---|---|
| 현재 batch full → 새 batch 시작 | 이전 batch는 deque tail에서 대기 → full=true |
| linger.ms 도달했으나 max.in.flight 가득 | drain 못 함 → 누적 |
| Sender 처리 지연 (broker 느림) | retry 또는 backlog 누적 |
Deque인 이유 (Queue 아닌)
| 동작 | 메서드 | 트리거 |
|---|---|---|
| 정상 append (호출 스레드) | addLast |
새 batch 시작 |
| 정상 drain (Sender 스레드) | pollFirst |
보낼 때 |
| retry 시 복귀 | addFirst |
실패 batch가 head로 → 순서 유지 |
→ retry 순서 보존을 위해 양쪽 끝 다 필요 → Deque.
BufferPool 재사용 패턴
Allocate (메모리 빌리기) — 새 ProducerBatch 만들 때

Deallocate (메모리 반납) — batch ACK 후 또는 종료 시

- batch.size(64KB)와 동일한 ByteBuffer는 clear()만 하고 free 풀로 복귀 → GC 부담 없음
- 메모리 부족으로 allocate가 await 중이라면 deallocate 끝에
Condition.signal()호출 → 자던 thread가 깨어나 다시 시도
Stage 2 — broker별 묶어서 wire send
partition → broker 매핑
partition 0 ─┐
partition 7 ─┼─→ broker-1 (leader)
partition 14 ─┘
partition 5 ─┐
partition 12 ─┼─→ broker-2 (leader)
partition 28 ─┘
...
각 partition의 leader broker는 cluster metadata가 결정. partition은 여러 broker에 분산, 한 broker는 여러 partition leader.
drain — broker별 collation
drain: per-partition deque에서 ready 상태인 ProducerBatch들을 꺼내 broker별로 묶는 작업.

batch(p=N)= partition N의 deque에 ready 상태로 있는 ProducerBatch 1개 (식별자에 partition 번호만 부여)- 예:
batch(p=0)과batch(p=9)가 모두 broker-1로 묶이는 이유 = broker-1이 partition 0과 partition 9 둘 다의 leader이기 때문 - partition별로 시점상 정확히 1개 batch만 ready인 단순 사례 (실제로는 deque에 여러 batch 누적 가능)
ProduceRequest 구조
drain 결과(Map<broker, List<batch>>)에서 broker별 List를 ProduceRequest로 빌드. 한 ProduceRequest 안에는 같은 broker가 leader인 모든 partition batch가 들어가고, topic이 달라도 같은 broker로 가는 거면 같이 묶임.
ProduceRequest → broker-X
├─ acks
├─ timeout
└─ TopicProduceDataCollection ← topic 별 N개 entry 가능
├─ topic A
│ ├─ partition 0 → batch(p=0)
│ └─ partition 7 → batch(p=7)
├─ topic B ← 같은 broker가 leader인 다른 topic도 함께
│ ├─ partition 3 → batch(p=3)
│ └─ partition 11 → batch(p=11)
└─ ...
KafkaTemplate 여러 개 쓸 때의 collation
Collation이 일어나는 단위 = KafkaProducer 인스턴스. KafkaTemplate 여러 개 만들어 topic별로 분리해도, 그들이 같은 ProducerFactory를 공유하면 underlying producer가 같아서 collation이 그대로 작동함.

| 같은 factory 공유 (기본) | 다른 factory 분리 | |
|---|---|---|
| KafkaProducer 인스턴스 | 1 | N |
| Accumulator | 1 (topic A+B partition 공존) | N (각자 격리) |
| Sender thread | 1 | N |
| broker당 socket | 1 | N |
| broker당 ProduceRequest | 1 (multi-topic 묶임) | N (각자 따로) |
| batching 효율 | ↑ (cross-topic) | ↓ |
| 운영 격리 | ✗ | ✓ |
| ack / compression 등 config | 동일 | 토픽별 다르게 가능 |
→ 분리 필요: 토픽별 다른 cluster / 다른 ack / 다른 transactional 정책.
→ 공유 OK: 같은 cluster + 같은 ack/compression (대부분의 케이스).
Sender 스레드의 한 사이클

NIO Selector — 1 thread × N broker
Sender thread는 단일이지만 NIO Selector로 broker N대의 socket을 동시 다룬다.

multiplex 특성
| 특성 | 의미 |
|---|---|
| non-blocking I/O | socket read/write가 즉시 return (block 안 함) |
| multiplexing | 1 thread가 N socket 동시 처리 — broker 늘어도 thread 추가 없음 |
| partial I/O 추적 | 한 번에 다 못 보내면 다음 poll에서 이어서 |
| wakeup | self-pipe로 OS-level block 강제 해제 |
wakeup 메커니즘 — Sender thread는 보통 selector.poll(linger.ms)에서 자고있음. 깨우는 트리거:
| 트리거 | 위치 | 결과 |
|---|---|---|
| batch full | 호출 스레드 append() | Selector self-pipe 1 byte write |
| linger.ms 만료 | Selector timeout | poll 자연 종료 |
| flush() | 호출 스레드 | wakeup |
| close() | 종료 절차 | wakeup |
Stage 3 — ACK 처리
callback chain

→ 1 wire ACK = N record callback 일제 트리거 (같은 batch).
Future가 latch를 공유하는 구조
ProducerBatch
├─ ProduceRequestResult (CountDownLatch(1))
└─ records[0..N]
├─ FutureRecordMetadata { relativeOffset=0, result_ref → ProduceRequestResult }
├─ FutureRecordMetadata { relativeOffset=1, result_ref → 동일 }
├─ ...
└─ FutureRecordMetadata { relativeOffset=N, result_ref → 동일 }
→ batch ACK 시 latch.countDown() 1번 → N개 future 일제 해제.
BufferPool 반납 트리거
| 케이스 | 호출 시점 | 반납 방식 |
|---|---|---|
| 성공 ACK | completeBatch 후 |
clear + free 풀로 |
| 종결 실패 (retry 한도 초과) | failBatch |
동일 |
Batch split (MESSAGE_TOO_LARGE) |
분할 후 원본 deallocate | 동일 |
| Producer 종료 | abortIncompleteBatches | 동일 |
| retry 시 | (해당 없음) | 반납 안 함 — 같은 ByteBuffer 들고 재시도 |
Exception 분류
| 분류 | 예시 | 처리 |
|---|---|---|
| ApiException | SerializationException, RecordTooLargeException, AuthorizationException | callback 동기 호출 + FutureFailure (throw 안 함) |
| InterruptedException | 스레드 인터럽트 | InterruptException throw |
| KafkaException / 기타 | 시스템 에러 | throw |
→ ApiException은 whenComplete로 잡힘. 그 외는 try/catch 필요.
운영 설정 reference
producerPerThread 비교
| shared (false, 기본) | per-thread (true) | |
|---|---|---|
| Producer 인스턴스 수 | 1 | thread 수 |
| Sender thread 수 | 1 | thread 수 |
| BufferPool | 64MB × 1 | 64MB × N |
| broker 연결 수 | broker 수 | thread × broker |
| append lock | per-partition deque (낮음) | 없음 |
| batch 크기 | 큼 (압축률↑) | 작음 (압축률↓) |
| ProduceRequest 수 | 적음 | N배 |
| 권장 | 거의 모든 케이스 | 단일 Sender saturate 극단 케이스만 |
timeout 4종 관계

| Config | 기본값 | 적용 단위 | 초과 시 |
|---|---|---|---|
| max.block.ms | 60s | send() 1회 (buffer/metadata 대기) | send() 자체에서 throw |
| request.timeout.ms | 30s | ProduceRequest 1회 round-trip | 해당 request fail → retry |
| retry.backoff.ms | 100ms (max 1s) | retry 사이 대기 (exp) | — |
| delivery.timeout.ms | 120s | send 시점부터 최종 결과까지 | callback에 TimeoutException |
필수 조건: delivery.timeout.ms ≥ linger.ms + request.timeout.ms
권장 config
kafka:
producer:
retries: 2147483647 # default
retry-backoff-ms: 100 # default (exp backoff)
delivery-timeout-ms: 120000 # default 2분
request-timeout-ms: 30000 # default 30s
max-block-ms: 60000 # default 60s
enable-idempotence: true # ★ retry 시 ordering + dedup
acks: all # ★ ISR commit까지 확인
compression-type: zstd
linger-ms: 50
batch-size: 65536 # 64KB
buffer-memory: 67108864 # 64MB
max-in-flight: 5
idempotence가 보호하는 범위
| 보호됨 | 보호 안 됨 | |
|---|---|---|
| 같은 producer 인스턴스 + 같은 partition + 네트워크 retry | ✅ broker가 PID/epoch/sequence로 dedup | — |
| Producer 재시작 후 같은 비즈니스 이벤트 재발행 | — | ❌ 새 PID라 broker가 다른 record로 봄 |
| Consumer 재처리 (rebalance, crash) | — | ❌ broker는 전달했음 |
| 다른 producer의 같은 logId 발행 | — | ❌ |
→ 비즈니스 idempotency (logId 기준 dedup)는 컨슈머 측 책임.
max.in.flight + idempotence 조합
| max.in.flight | idempotence | guaranteeMessageOrder | Ordering 보장 | Throughput |
|---|---|---|---|---|
| 1 | false | true (내부 derived) | ✅ (mute 메커니즘) | ⬇⬇ |
| 1 | true | true | ✅ | ⬇⬇ |
| 5+ (기본) | false | false | ❌ retry 시 깨질 위험 | ⬆⬆ |
| 5+ (Kafka 3.0+ default) | true | false | ✅ (broker sequence 검증) | ⬆⬆ |
실패 처리 패턴 비교
| 패턴 | 매커니즘 | 손실 보장 | 복잡도 |
|---|---|---|---|
| Fire-and-forget | callback에 로그 + metric | ❌ | 낮음 |
| In-memory retry queue | bounded queue + 백그라운드 retry | ❌ (queue 만석 시 손실) | 중간 |
| Outbox | DB tx에 event row + 별도 publisher | ✅ | 높음 |
| DLQ topic | 실패 이벤트를 별도 topic으로 | △ (Kafka 죽으면 무력) | 중간 |
→ broker 영구 장애 대비 진짜 보장은 outbox. in-memory queue는 transient 회복용 안전망.
빠른 참조 — 한 record의 여정

핵심 takeaway
| 직관 | 실제 |
|---|---|
| send 1회 = wire send 1회 | append 1회 + 별도 thread가 모아서 wire send |
| partition 32개 = wire request 32개 | broker 3대면 wire request 3개 (collation) |
| broker 늘리면 thread 늘어야 | NIO multiplex, 1 thread가 N broker 처리 |
| send 콜백이 producer thread에서 실행 | Sender thread에서 실행 (콜백 무거우면 전체 지연) |
| close() 호출하면 producer 닫힘 | (spring-kafka 비트랜잭션) close()는 no-op, 싱글톤 유지 |
max.in.flight = 전체 in-flight 한도 |
broker 1대당 — broker N × max.in.flight 동시 가능 |
| TimeoutException = 응답 없음 | (callback에 도착하면) 이미 delivery.timeout 만큼 retry 진행됨 |
'Develop' 카테고리의 다른 글
| Solidity 치트시트 (0) | 2026.04.18 |
|---|---|
| Probabilistic data structures : Bloom Filter, Cuckoo Filter, Ribbon Filter | Probabilistic Data Structures: Bloom Filter, Cuckoo Filter, Ribbon Filter (7) | 2025.05.26 |
댓글
Comments