카프카 프로듀서
카프카 프로듀서
3장. 카프카 프로듀서: 카프카에 메시지 쓰기
시작하기
카프카를 큐로 사용하든, 메시지 버스로 사용하든, 데이터 저장 플랫폼으로 사용하든 간에 카프카에 데이터를 쓸 때는 프로듀서, 읽어올 때는 컨슈머를 사용한다.
이 장에서는 이하의 내용을 다룬다.
- 프로듀서의 디자인과 주요 구성 요소의 전체적인 모습을 확인
KafkaProducer
와KafkaRecord
객체 생성 방법- 카프카에 레코드를 어떻게 전송하는지
- 레코드 전송 시에 발생하는 오류를 어떻게 처리하는지
- 카프카 프로듀서의 작동을 제어하기 위한 주요 설정 옵션
- 파티션 할당 방식을 정의하는 파티셔너
- 객체의 직렬화 방식을 정의하는 시리얼라이저
3.1 프로듀서 개요
- 다양한 목적으로 카프카를 사용하게 되는데, 목적이 다양한 만큼 요구사항이 다양
- 메시지 유실을 허용하는가?
- 메시지 중복을 허용하는가?
- 반드시 지켜야할 Latency 와 Throughput 값
- 프로듀서 요소 개괄 (그림 3-1, p 51)
- 프로듀서 API 자체는 매우 단순하지만, 데이터 전송 시에는 생각보다 많은 작업이 이루어짐
ProducerRecord
객체를 생성- 토픽과 밸류는 필수, 키와 파티션 지정은 선택사항
ProducerRecord
를 전송하는 API 호출- 프로듀서가 가장 먼저 하는 일은 키와 값 객체가 네트워크에서 전송될 수 있도록 바이트 배열로 변환
- 파티션을 지정하지 않았다면?
- 파티셔너에게 데이터를 보냄
- 파티셔너는 파티션을 결정하는 역할을 수행
- 파티션 결정의 기준 = 보통
ProducerRecord
객체의 키 값 - 파티셔너가 파티션을 결정하여 전송될 토픽과 파티션이 확정
- 프로듀서는 해당 레코드를 같은 토픽 파티션으로 전송될 레코드를 모은 레코드 배치에 추가
- 별도의 스레드가 레코드 배치를 카프카 브로커에 전송
- 브로커는 메시지를 받고 프로듀서에게 응답을 돌려줌
RecordMetadata
객체를 응답으로 돌려줌- 토픽, 파티션, 해당 파티션 안에서의 오프셋 정보
- 프로듀서가 에러를 수신했을 경우
- 메시지 쓰기를 포기하고 사용자에게 에러를 리턴하기까지 몇 번 더 재전송을 시도할 수 있음
3.2 카프카 프로듀서 생성하기
- 카프카에 메시지를 쓰려면 원하는 속성을 지정해서 프로듀서 객체를 생성
- 지정해야 하는 필수 속성 값
bootstrap.servers
- 카프카 클러스터와 첫 연결을 생성하기 위해 프로듀서가 사용할 브로커의 host:port 목록
- 모든 브로커의 주소를 포함할 필요는 없음
- 프로듀서가 첫 연결을 생성한 뒤 추가 정보를 받아오게 되어 있기 때문
- 브로커가 작동을 정지하는 경우를 대비하여 최소 2개 이상의 브로커 정보를 기입
key.serializer
- 카프카에 쓸 레코드의 키의 값을 직렬화하기 위해 사용하는
serializer
클래스 이름org.apache.kafka.common.serialization.Serializer
의 구현체
- 카프카 브로커는 메시지 키값, 밸류값으로 된 바이트 배열을 받음
- 프로듀서 인터페이스는 제네릭스를 이용하여 키, 밸류의 타입을 사용
- 따라 프로듀서 입장에서는 이 객체를 어떻게 바이트 배열로 바꾸는지 알아야 함
- 카프카 client 패키지에는 자주 사용되는 타입의 시리얼라이저들이 다수 구현되어 있음
- 자주 사용되는 타입을 사용할 경우 구현할 필요 없음
StringSerializer
ByteArraySerializer
IntegerSerializer
등
- 자주 사용되는 타입을 사용할 경우 구현할 필요 없음
- 키 값 없이 밸류 값만 전송할 때에도
key.serializer
설정은 하지만VoidSerializer
를 사용해서 키 타입으로Void
타입을 설정할 수 있다. (???)
- 카프카에 쓸 레코드의 키의 값을 직렬화하기 위해 사용하는
value.serializer
- 카프카에 쓸 레코드의 밸류 값을 직렬화하기 위해 사용하는
Serializer
의 클래스 이름 key.serializer
와 대부분 동일
- 카프카에 쓸 레코드의 밸류 값을 직렬화하기 위해 사용하는
- 프로듀서 메시지 전송 방법
- fire and forget
- 메시지를 서버에 전송하고 성공 혹은 실패 여부에 관심을 두지 않음
- 카카는 전송 실패 시 해당 메시지를 자동으로 재전송 시도하기 때문에 대부분의 메시지는 정상적으로 전송됨
- 다만, 재시도를 할 수 없는 오류가 발생하거나, 타임아웃이 발생한다면 메시지가 유실됨
- 애플리케이션은 해당 내역에 대해서 아무런 정보 혹은 예외를 받을 수 없음
- synchronous send
- 카프카는 기술적으로 항상 비동기로 전송됨
send()
메시지를 호출하면Future
객체를 리턴
- 해당 방법은
Future.get()
메소드를 호출해서 작업이 완료될 때까지 기다렸다가 실제 성공 여부를 확인
- 카프카는 기술적으로 항상 비동기로 전송됨
- asynchrounous send
- 콜백 함수를 인자로
send()
함수를 호출 - 카프카 브로커로부터 응답을 받는 시점에서 자동으로 콜백 함수가 호출 됨
- 콜백 함수를 인자로
- fire and forget
3.3 카프카로 메시지 전달하기
1
2
3
4
5
6
val record = ProducerRecord<String, String>("io.customer.country", "Precision Products", "France")
try {
producer.send(record)
} catch (e: Exception) {
e.printStacktrace()
}
send()
까지의 과정ProducerRecord
- 프로듀서에 전달할 ** 객체를 생성
- 생성자가 여러개 있음
- 위 예제에서는 토픽, 키, 밸류 값을 사용 (모두 문자열 String 타입)
- 키와 밸류 타입은
key.serializer
,value.serializer
와 타입이 동일해야 함 - 프로듀서의
send()
함수 호출 (1번에서 생성한ProducerRecord
객체를 인자로 하여) - 그림 3-1 에서 보았듯이 메시지는 버퍼(??)에 저장되었다가 별도 스레드에 의해 브로커로 전달
RecordMetadata
를 포함한 자바Future
타입의 객체를 응답으로 돌려줌- 위 예제에서는 b 의 응답 객체를 활용하지 않으므로 무시하였음
- 이 경우 메시지 전송의 성공 여부를 알아낼 방법은 없음
- 메시지가 조용히 누락되어도 (유실) 상관없는 경우 사용될 수 있음
- 브로커에 메시지를 전송할 때 발생하는 에러 혹은 브로커 자체에서 발생한 에러를 무시하더라도 프로듀서가 카프카로 메시지를 보내기 전에 에러가 발생할 경우 예외 발생 가능
- 메시지를 직렬화하는데 실패할 경우:
SerializationException
- 버퍼가 가득 찰 경우:
TimeoutException
- 전송 작업을 수행하는 스레드에 인터럽트가 걸리는 경우:
InterruptException
3.3.1 동기적으로 메시지 전송하기
동기적으로 메시지를 전송하는 방법은 매우 단순
- 카프카 브로커가 쓰기 요청에 에러 응답이 발생하거나, 재전송 횟수가 소진되었을 때 발생되는 예외를 받아서 처리할 수 있음
- 주요한 균형점은 성능
- 카프카 클러스터에 작업이 어느정도 몰리느냐에 따라서 브로커는 쓰기 지연 발생 (2ms ~ 몇 초)
- 이 시간 동안 메시지를 전송할 경우 전송을 요청하는 스레드는 이 시간 동안 아무 일도 안하면서 기다림 (다른 메시지를 전송할 수 없음)
- 결과적으로 성능이 크게 낮아지기 때문에 동기적 전송은 실제 서비스(혹은 애플리케이션)에서 사용하지 않는 경우가 많음
1
2
3
4
5
6
val record = ProducerRecord<String, String>("io.customer.country", "Precision Products", "France")
try {
producer.send(record).get() // get() 호출 시 Future 의 결과를 기다림 - 1
} catch (e: Exception) {
e.printStacktrace() // - 2
}
- 카프카로부터 응답이 올 때까지 대기하기 위해
Future.get()
함수를 사용 - 레코드가 카프카로 성공적으로 전송되지 않으면 예외를 발생
- 예외가 발생하지 않으면
RecordMetadata
객체를 리턴 - 카프카에 메시지를 전송하기 전이나 전송하는 도중에 에러가 발생하는 경우 예외가 발생
KafkaProducer
에서 발생하는 에러- 재시도 가능한 에러
- 메시지를 다시 전송함으로써 해결되는 에러를 의미
- 예
- 연결 에러 : 연결이 회복되면 해결
- 메시지를 전송받은 브로커가 해당 파티션의 리더가 아닐 경우 : 해당 파티션에 새 리더가 선출되고 클라이언트 메타데이터가 업데이트 되면 해결
- 위와 같은 에러가 발생했을 때 자동으로 재시도하도록
KafkaProducer
를 설정할 수 있음- 이 경우 재전송 횟수가 소진되고서도 에러가 해결되지 않는 경우에 한해 재시도 가능한 예외 발생
- 재시도 불가능한 에러
- 메시지 크기가 너무 클 경우 등
- 이러한 경우
KafkaProducer
는 재시도를 하지 않고 예외를 발생시킴
- 재시도 가능한 에러
3.3.2 비동기적으로 메시지 전송하기
- 대부분의 카프카 전송의 경우 굳이 응답이 필요 없음
- 응답으로
RecordMetadata
를 돌려주는데 대부분의 애플리케이션은 이 정보가 필요없기 때문
- 응답으로
- 반대로 메시지 전송이 완전히 실패했을 경우에는 해당 내용을 알 필요가 있음
- 메시지를 비동기적으로 전송하고도 에러를 처리하는 경우를 위해
KafkaProducer
는KafkaRecord
를 전송할 때Callback
을 지정할 수 있음
1
2
3
4
5
6
7
8
9
10
class DemoProducerCallback : Callback { // 1
override fun onCompletion(metadata: RecordMetadata, e: Exception?) {
if (e != null) {
e.printStacktrace() // 2
}
}
}
val record = ProducerRecord<String, String>("io.customer.country", "Precision Products", "USA")
producer.send(record, DemoProducerCallback()) // 3
- 콜백을 사용하기 위해
org.apache.kafka.clients.producer.Callback
인터페이스를 구현 - 인터페이스는
onCompletion()
단 하나의 메소드만 가지고 있음 - 카프카가 에러를 응답한다면
onCompletion()
의Exception
인자에는null
이 아닌 객체가 전달됨 - 해당 로직에서 오류처리를 진행하면 됨
KafkaProducer
가 메시지를 전송할 때Callback
을 사용하기 위해Callback
객체를 매개변수로 전달
3.4 프로듀서 설정하기
프로듀서는 3.2 항에서 살펴보았던 필수적인 설정 정보 외에도 굉장히 많은 수의 설정값을 가지고 있다. 대부분 합리적인 기본값을 가지고 있어서 일일히 설정을 할 필요는 없지만, 몇몇 설정값들의 경우 메모리 사용량이나 성능, 신뢰성 등에 상당한 영향을 미친다.
3.4.1. client.id
프로듀서와 해당 프로듀서를 사용하는 애플리케이션을 구분하는 논리적 식별자
- 임의의 문자열 사용 가능
- 브로커는 해당 값을 이하의 내용으로 활용
- 프로듀서가 보내온 메시지를 서로 구분하기 위해 사용
- 로그 메시지를 출력할 때 사용
- 성능 메트릭 값을 집계할 때 사용
- 클라이언트 별로 사용량을 할당할 때 사용
- 이 값을 잘 선택하는 것은 문제가 발생했을 때 트러블 슈팅을 용이하게 함
- IP 123.123.123.123 에서 인증 실패가 자주 발생하고 있네?
- “주문 확인 서비스” 가 인증에 실패하고 있는 듯한데, 로라한테 한 번 봐달라고 해줄래?
3.4.2 acks
프로듀서가 임의의 쓰기 작업이 성공했다고 판별하기 위해 얼마나 많은 파티션 레플리카가 해당 레코드를 받아야하는지 결정하는 설정값
- 기본값 = 리더가 해당 레코드를 받은 뒤 쓰기 작업이 성공했다고 응답하는 것
acks = 0
- 프로듀서는 메시지가 성공적으로 전달되었다라고 간주하고 브로커의 응답을 기다리지 않음
- 브로커가 메시지를 받지 못했을 경우, 프로듀서는 해당 상황에 대해 알 방법이 없고, 메시지는 그대로 유실
- 프로듀서가 서버로부터 응답을 기다리지 않는 만큼 네트워크가 허용하는 한 빠르게 메시지를 보낼 수 있음.
- 높은 처리량이 필요할 때 이 설정값을 사용하면 됨
acks = 1
- 프로듀서는 리더 레플리카가 메시지를 받는 순간 브로커로부터 성공했다는 응답을 받음
- 만약 리더에 메시지를 쓸수 없다면? (리더에 크래시가 났는데 새 리더가 선출되기 전 등)
- 프로듀서는 에러 응답을 받게 되며, 데이터 유실을 피하기 위해 메시지 재전송을 시도하게 됨
- 하지만 리더에 크래시가 난 상태에서 해당 메시지가 복제가 안 된 채로 새 리더가 선출될 경우에는 여전히 메시지가 유실 될 수 있음
acks = all
- 프로듀서는 메시지가 모두 인-싱크 레플리카 에 전달된 뒤에야 브로커로부터 성공했다는 응답을 받음
- 가장 안전한 형태
- 최소 2개 이상의 브로커가 해당 메시지를 가지고 있으며
- 위 내용은 크래시가 났을 때에도 메시지가 유실되지 않기 때문
acks = 1
인 경우 단순히 브로커 하나가 메시지를 받는 것보다 더 기다려야 하기 때문에 지연시간은 더 길어지게 됨
- 참고
- 프로듀서의
acks
값을 내려서 신뢰성을 낮추면 그만큼 레코드를 빠르게 보낼 수 있음 - 즉, 신뢰성과 프로듀서 지연 사이에는 트레이드 오프 관계가 있다는 이야기
- 그러나 세 가지
acks
값 모두 레코드가 생성되어 컨슈머가 읽을 수 있을 때까지의 시간을 의미하는 종단 지연 (end to end latency) 의 경우 세 값이 모두 똑같음- 카프카는 일관성을 유지하기 위해서 모든 인-싱크 레플리카가 복제가 완료된 뒤에야 컨슈머가 레코드를 읽어 갈 수 있게 하기 때문
- 따라서 프로듀서 지연이 아니라 주로 종단 지연이 고려되어야 한다면?
acks
값을 절충해야 할 것은 없음- 가장 신뢰성 있는 설정을 택하더라도 종단 지연은 항상 똑같기 때문
- 프로듀서의
3.4.2 메시지 전달 시간
send()
를 호출했을 때 성공 혹은 실패하기까지 얼마나 시간이 걸리는가?
- 이 시간은 카프카가 성공적으로 응답을 내려보내 줄 때까지 사용자가 기다릴 수 있는 시간
- 요청 실패를 인정하고 포기할 때까지 기다릴 수 있는 시간이기도 함
2.1 버전부터 개발진은 ProducerRecord
를 보낼 때 걸리는 시간을 두 구간으로 나누어 따로 처리할 수 있도록 함
send()
에 대한 비동기 호출이 이뤄진 시간부터 결과를 리턴할 때까지 걸리는 시간- 이 시간동안 send()를 호출한 스레드는 블록됨
send()
에 대한 비동기 호출이 성공적으로 리턴한 시각부터 (성공/실패에 관계없이) 콜백이 호출될 때까지 걸리는 시간- 이 시간은 이하의 시간과 동일
ProducerRecord
가 전송을 위해 배치에 추가된 시점에서부터 카프카가 성공 응답을 보내거나- 재시도 불가능한 실패가 일어나거나
- 아니면 전송을 위해 할당된 시간이 소진될 때까지의 시간
- 이 시간은 이하의 시간과 동일
- 참고
send()
를 동기적으로 호출할 경우, 메시지를 보내는 스레드는 두 구간에 대해 연속적으로 블록되기 때문에 각각의 구간이 어느정도 걸렸는지 알 수 없음
- 카프카 프로듀서 내부에서의 메시지 전달 시간을 작업별로 나눈 개념도 (그림 3-2, p60)
max.block.ms
- 아래의 경우에 대해 얼마나 오랫동안 블록되는지 결정하는 매개변수
send()
를 호출했을 때partitionsFor()
를 호출해서 명시적으로 메타데이터를 요청했을 때send()
함수는 프로듀서의 전송 버퍼가 가득 차거나 메타데이터가 아직 사용 가능하지 않을 때 블록됨- 이 상태에서
max.block.ms
만큼 시간이 흐르면 예외가 발생한다. delivery.timeout.ms
- 이 설정은 레코드 전송 준비가 완료된 시점 (
send()
가 문제없이 리턴되고 레코드가 배치에 저장된 시점) 에서부터 브로커의 응답을 받거나 전송을 포기하게 되는 시점까지의 제한시간을 결정. - (그림 3-2 참고)
linger.ms
보다 커야 함 - 위 제한 조건을 벗어난 설정으로
KafkaProducer
를 생성하면 예외 발생 - 메시지는
delivery.timeout.ms
보다 빨리 전송될 수 있으며 실제로도 그렇다 - 만약 프로듀서가 재시도를 하는 도중에
delivery.timeout.ms
를 넘어간다면? - 마지막으로 재시도 하기 전에 브로커가 리턴한 에러에 해당하는 예외와 함께 콜백이 호출됨
- 레코드 배치가 전송을 기다리는 와중에
delivery.timeout.ms
를 넘어간다면? - 타임아웃 예외와 함께 콜백이 호출됨.
- 참고
- 사용자 입장에서 메시지 전송에 기다릴 수 있는 만큼
delivery.timeout.ms
값을 최대값으로 설정할 수도 있다. delivery.timeout.ms
값을 몇 분 정도로 설정하고retries
의 기본값(사실상 무한으로 두는 것)을 그대로 둔다면 프로듀서는 재시도할 시간이 고갈될 때까지 (혹은 전송이 성공할 떄까지) 계속해서 재전송을 할 것- 위 방식은 재시도 관련 설정을 할 때 훨씬 더 합리적인 방법
- 참고2
- 재시도 관련 설정을 튜닝하는 일반적인 방식?
- 브로커가 크래시났을 때 리더 선출에 대략 30초가 걸리므로 재시도 한도를 안전하게 120초로 유지하자
- 위와 같이 머리속으로 생각한 것을 재시도 횟수와 재시도 사이의 시간 간격으로 옮기려고 하는 대신에 그냥
delivery.timeout.ms
를 120초로 설정하면 되는 것 request.timeout.ms
- 프로듀서가 데이터를 전송할 때 서버로부터 응답을 받기 위해 얼마나 기다릴 것인지 결정
- 각각의 쓰기 요청 후 전송을 포기하기까지의 대기하는 시간
- 이 시간은 재시도 시간, 실제 전송 이전에 소요되는 시간 등을 포함하지 않음
- 응답없이 타임아웃이 발생할 경우 프로듀서는 재전송을 시도하거나 아니면
TimeoutException
과 함께 콜백을 호출 retries, retry.backoff.ms
- 프로듀서가 서버로부터 에러 메시지를 받았을 때 이것이 일시적인 에러 (파티션에 리더가 없는 경우) 일 수도 있음
retries
매개변수는 프로듀서가 메시지 전송을 포기하고 에러를 발생시킬 때까지 메시지를 재전송하는 횟수를 결정한다.- 기본적으로 프로듀서는 각각의 재시도 사이에 100ms 동안 대기
retry.backoff.ms
값을 변경해서 이 간격을 조정 가능- 현재 버전의 카프카에서 이 값들을 조정하는 것을 권장하지 않음
- 크래시가 난 브로커가 정상으로 돌아오기까지 (즉, 모든 파티션에 대해 새리더가 선출되는데 얼마나 시간이 걸리는지) 의 시간을 테스트 한 뒤
delivery.timeout.ms
매개변수를 잡는 것을 권장 - 프로듀서는 모든 에러를 재전송하는 것은 아님
- 어떤 에러는 일시적인 에러가 아니기 떄문에 재시도의 대상이 아님
- 일반적으로 프로듀서가 알아서 재전송을 처리해주기 때문에 애플리케이션 코드에서는 관련 처리를 수행하는 코드가 필요 없음
- 개발자는 재시도 불가능한 에러를 처리하거나 재시도 횟수가 고갈되었을 경우에 대한 처리에만 집중하면 됨
linger.ms
- 현 배치를 전송하기 전까지 대기하는 시간을 결정
KafkaProducer
는 현재 배치가 다 차거나,linger.ms
에 설정된 제한 시간이 되었을 때 메시지 배치를 전송- 프로듀서는 메시지 전송에 사용할 수 있는 스레드가 있을 때 곧바로 전송하도록 되어 있음
linger.ms
> 0 으로 설정할 경우 프로듀서가 브로커에 메시지 배치를 전송하기 전에 메시지를 추가할 수 있도록 몇 ms 가량 더 기다리도록 할 수 있다.- 이는 지연을 조금 증가시키는 대신에 throughput 을 크게 증가시킨다
- 단위 메시지당 추가적으로 드는 시간은 매우 작지만 압축이 되어 있거나 할 경우 훨씬 더 효율적이기 때문
buffer.memory
- 메시지를 전송하기 전에 메시지를 대기시키는 버퍼의 크기 (메모리의 양) 을 결정
- 애플리케이션이 서버에 전달 가능한 속도보다 더 빠르게 메시지를 전송한다면 버퍼 메모리가 가득 찰 수 있음
- 이 경우 추가로 호출되는
send()
는max.block.ms
동안 블록되어 버퍼 메모리에 공간이 생길 때 까지 기다림 - 해당 시간동안 대기를 수행하고도 버퍼 메모리의 공간이 확보되지 않으면 예외를 발생시킴
- 대부분의 프로듀서 예외와는 달리 이 타임아웃은
send()
함수에서 발생하지,send()
함수에서 리턴하는Future
객체에서 발생하지 않음 compression.type
- 메시지를 압축하여 브로커로 전송하고 싶을 때 설정
snappy
,gzip
,lz4
,zstd
중 하나로 설정하여 해당 압축 알고리즘으로 메시지를 압축하여 브로커로 전송- 네트워크 사용량(카프카로 메시지를 전송할 때 병목 구간)과 저장 공간을 절약할 수 있음
batch.size
- 같은 파티션에 다수의 레코드가 전송될 경우 프로듀서는 이것들을 배치 단위로 모아서 한꺼번에 전송
- 각각의 배치에 사용할 메모리 양을 결정 (개수가 아니라 바이트 단위)
- 배치가 가득차면 해당 배치에 들어있는 모든 메시지가 한꺼번에 전송
- 그러나 프로듀서가 각각의 배치가 가득 찰 떄까지 기다린다는 의미는 아님
- 프로듀서는 하나의 메시지만 들어있는 배치도 전송함
- 따라서 이 매개변수를 큰 값으로 유지한다고 해서 메시지 전송에 지연이 발생하지는 않음
- 반대로 지나치게 작게 설정할 경우 프로듀서가 지나치게 자주 메시지를 전송해야하므로 약간의 오버헤드가 발생
max.in.flight.request.per.connection
- 프로듀서가 서버로부터 응답을 받지 못한 상태에서 전송할 수 있는 최대 메시지의 수를 결정
- 이 값을 올리게 되면 메모리 사용량이 증가하지만 처리량도 증가
- 단일 데이터센터에서 이 값이 2일 때 처리량이 최대를 기록
- 기본값이 5를 사용하더라도 비슷한 성능을 보여줌
- 참고 : 순서보장
- 카프카는 파티션 내에서 메시지의 순서를 보존하게 되어 있음
- 특정 순서로 메시지를 보낼경우 브로커가 받아서 파티션에 쓸 때나 컨슈머가 읽어올 떄 해당 순서대로 처리된다는 것
- 특정 상황에서는 순서가 매우 중요한 경우가 있음
retries
> 0 으로 설정한 상태에서max.in.flight.request.per.connection
> 1 로 설정할 경우 메시지의 순서가 뒤짚어질 수 있음- 브로커가 첫번째 배치를 받아서 쓰려다 실패했는데, 두번째 배치를 쓸 때는 성공한 상황 (두 번째 배치가 in-flight 상태) 에서 다시 첫 번째 배치를 재전송되어 성공한 경우가 이 경우
- 성능상의 고려 때문에 in-flight 요청이 최소 2 이상은 되어야 한다는 점 그리고 신뢰성을 보장하기 위해서 재시도 횟수 또한 높아야 한다는 점을 감안하면, 가장 합당한 선택은
enable.idempotence=true
로 설정하는 것 - 이 설정은 최대 5 개의 in-flight 요청을 허용하면서도 순서를 보장하고, 재전송이 발생하더라도 중복이 발생하는 것을 방지해 줌
- 멱등적 프로듀서 (8장에서 설명)
enable.idempotence
- 0.11 버전부터 카프카는 ‘exactly once’ 를 지원하기 시작
- 멱등적 프로듀서는 그중에서도 매우 강력한 부분
- 신뢰성을 최대화하는 방향으로 프로듀서를 설정했다고 가정
acks=all
delivery.timeout.ms
를 큰 값으로 설정- 이 경우 프로듀서는 at least once (최소한 한 번) 카프카에 메시지를 쓰게 됨 (중복 발생 가능)
- 브로커가 프로듀서로부터 레코드를 받아서 로컬 디스크에 쓰고, 다른 브로커에도 복제가 되었다고 가정
- 여기서 첫 번째 브로커가 프로듀서로 응답을 보내기 전에 크래시가 났음
- 프로듀서는
request.timeout.ms
만큼 기다렸다가 재전송 시도 - 이 때 새로 보내진 메시지는 (이미 기존에 쓰기 작업이 성공적으로 복제되었으므로) 이미 메시지를 받은 바 있는 새 리더 브로커로 전달되게 됨
- 결국 중복 저장이 가능
enable.idempotence
설정을true
로 잡는 것은 바로 이러한 사태를 방지하기 때문- 위 기능이 활성화되면 프로듀서는 레코드를 보낼 때마다 순차적인 번호를 붙여서 보냄
- 브로커가 동일한 번호를 가진 레코드를 2개 이상 받을 경우 하나만 저장
- 프로듀서는 별다른 문제를 발생시키지않는
DupulicateSequenceException
을 받음 - 위 기능을 설정하려면 이하로 설정해야 함 (아닐 경우
ConfigException
발생) max.in.flight.requests.per.connection
≤ 5retries
≥ 1acks=all
This post is licensed under CC BY 4.0 by the author.