Post

카프카 프로듀서

카프카 프로듀서

3장. 카프카 프로듀서: 카프카에 메시지 쓰기

시작하기

카프카를 큐로 사용하든, 메시지 버스로 사용하든, 데이터 저장 플랫폼으로 사용하든 간에 카프카에 데이터를 쓸 때는 프로듀서, 읽어올 때는 컨슈머를 사용한다.

이 장에서는 이하의 내용을 다룬다.

  • 프로듀서의 디자인과 주요 구성 요소의 전체적인 모습을 확인
  • KafkaProducerKafkaRecord 객체 생성 방법
  • 카프카에 레코드를 어떻게 전송하는지
  • 레코드 전송 시에 발생하는 오류를 어떻게 처리하는지
  • 카프카 프로듀서의 작동을 제어하기 위한 주요 설정 옵션
  • 파티션 할당 방식을 정의하는 파티셔너
  • 객체의 직렬화 방식을 정의하는 시리얼라이저

3.1 프로듀서 개요

  • 다양한 목적으로 카프카를 사용하게 되는데, 목적이 다양한 만큼 요구사항이 다양
    • 메시지 유실을 허용하는가?
    • 메시지 중복을 허용하는가?
    • 반드시 지켜야할 Latency 와 Throughput 값
  • 프로듀서 요소 개괄 (그림 3-1, p 51)
  • 프로듀서 API 자체는 매우 단순하지만, 데이터 전송 시에는 생각보다 많은 작업이 이루어짐
    1. ProducerRecord 객체를 생성
    2. 토픽과 밸류는 필수, 키와 파티션 지정은 선택사항
    3. ProducerRecord 를 전송하는 API 호출
    4. 프로듀서가 가장 먼저 하는 일은 키와 값 객체가 네트워크에서 전송될 수 있도록 바이트 배열로 변환
    5. 파티션을 지정하지 않았다면?
    6. 파티셔너에게 데이터를 보냄
    7. 파티셔너는 파티션을 결정하는 역할을 수행
    8. 파티션 결정의 기준 = 보통 ProducerRecord 객체의 키 값
    9. 파티셔너가 파티션을 결정하여 전송될 토픽과 파티션이 확정
    10. 프로듀서는 해당 레코드를 같은 토픽 파티션으로 전송될 레코드를 모은 레코드 배치에 추가
    11. 별도의 스레드가 레코드 배치를 카프카 브로커에 전송
    12. 브로커는 메시지를 받고 프로듀서에게 응답을 돌려줌
    13. RecordMetadata 객체를 응답으로 돌려줌
    14. 토픽, 파티션, 해당 파티션 안에서의 오프셋 정보
    15. 프로듀서가 에러를 수신했을 경우
    16. 메시지 쓰기를 포기하고 사용자에게 에러를 리턴하기까지 몇 번 더 재전송을 시도할 수 있음

3.2 카프카 프로듀서 생성하기

  • 카프카에 메시지를 쓰려면 원하는 속성을 지정해서 프로듀서 객체를 생성
  • 지정해야 하는 필수 속성 값
    • bootstrap.servers
      • 카프카 클러스터와 첫 연결을 생성하기 위해 프로듀서가 사용할 브로커의 host:port 목록
      • 모든 브로커의 주소를 포함할 필요는 없음
        • 프로듀서가 첫 연결을 생성한 뒤 추가 정보를 받아오게 되어 있기 때문
      • 브로커가 작동을 정지하는 경우를 대비하여 최소 2개 이상의 브로커 정보를 기입
    • key.serializer
      • 카프카에 쓸 레코드의 키의 값을 직렬화하기 위해 사용하는 serializer 클래스 이름
        • org.apache.kafka.common.serialization.Serializer 의 구현체
      • 카프카 브로커는 메시지 키값, 밸류값으로 된 바이트 배열을 받음
      • 프로듀서 인터페이스는 제네릭스를 이용하여 키, 밸류의 타입을 사용
        • 따라 프로듀서 입장에서는 이 객체를 어떻게 바이트 배열로 바꾸는지 알아야 함
      • 카프카 client 패키지에는 자주 사용되는 타입의 시리얼라이저들이 다수 구현되어 있음
        • 자주 사용되는 타입을 사용할 경우 구현할 필요 없음
          • StringSerializer
          • ByteArraySerializer
          • IntegerSerializer
      • 키 값 없이 밸류 값만 전송할 때에도 key.serializer 설정은 하지만 VoidSerializer 를 사용해서 키 타입으로 Void 타입을 설정할 수 있다. (???)
    • value.serializer
      • 카프카에 쓸 레코드의 밸류 값을 직렬화하기 위해 사용하는 Serializer 의 클래스 이름
      • key.serializer 와 대부분 동일
  • 프로듀서 메시지 전송 방법
    • fire and forget
      • 메시지를 서버에 전송하고 성공 혹은 실패 여부에 관심을 두지 않음
      • 카카는 전송 실패 시 해당 메시지를 자동으로 재전송 시도하기 때문에 대부분의 메시지는 정상적으로 전송됨
      • 다만, 재시도를 할 수 없는 오류가 발생하거나, 타임아웃이 발생한다면 메시지가 유실
        • 애플리케이션은 해당 내역에 대해서 아무런 정보 혹은 예외를 받을 수 없음
    • synchronous send
      • 카프카는 기술적으로 항상 비동기로 전송됨
        • send() 메시지를 호출하면 Future 객체를 리턴
      • 해당 방법은 Future.get() 메소드를 호출해서 작업이 완료될 때까지 기다렸다가 실제 성공 여부를 확인
    • asynchrounous send
      • 콜백 함수를 인자로 send() 함수를 호출
      • 카프카 브로커로부터 응답을 받는 시점에서 자동으로 콜백 함수가 호출 됨

3.3 카프카로 메시지 전달하기

1
2
3
4
5
6
val record = ProducerRecord<String, String>("io.customer.country", "Precision Products", "France")
try {
  producer.send(record)
} catch (e: Exception) {
  e.printStacktrace()
}
  • send() 까지의 과정 ProducerRecord
    1. 프로듀서에 전달할 ** 객체를 생성
    2. 생성자가 여러개 있음
    3. 위 예제에서는 토픽, 키, 밸류 값을 사용 (모두 문자열 String 타입)
    4. 키와 밸류 타입은 key.serializer, value.serializer 와 타입이 동일해야 함
    5. 프로듀서의 send() 함수 호출 (1번에서 생성한 ProducerRecord 객체를 인자로 하여)
    6. 그림 3-1 에서 보았듯이 메시지는 버퍼(??)에 저장되었다가 별도 스레드에 의해 브로커로 전달
    7. RecordMetadata 를 포함한 자바 Future 타입의 객체를 응답으로 돌려줌
    8. 위 예제에서는 b 의 응답 객체를 활용하지 않으므로 무시하였음
    9. 이 경우 메시지 전송의 성공 여부를 알아낼 방법은 없음
    10. 메시지가 조용히 누락되어도 (유실) 상관없는 경우 사용될 수 있음
    11. 브로커에 메시지를 전송할 때 발생하는 에러 혹은 브로커 자체에서 발생한 에러를 무시하더라도 프로듀서가 카프카로 메시지를 보내기 전에 에러가 발생할 경우 예외 발생 가능
    12. 메시지를 직렬화하는데 실패할 경우: SerializationException
    13. 버퍼가 가득 찰 경우: TimeoutException
    14. 전송 작업을 수행하는 스레드에 인터럽트가 걸리는 경우: InterruptException

3.3.1 동기적으로 메시지 전송하기

동기적으로 메시지를 전송하는 방법은 매우 단순

  • 카프카 브로커가 쓰기 요청에 에러 응답이 발생하거나, 재전송 횟수가 소진되었을 때 발생되는 예외를 받아서 처리할 수 있음
  • 주요한 균형점은 성능
    • 카프카 클러스터에 작업이 어느정도 몰리느냐에 따라서 브로커는 쓰기 지연 발생 (2ms ~ 몇 초)
    • 이 시간 동안 메시지를 전송할 경우 전송을 요청하는 스레드는 이 시간 동안 아무 일도 안하면서 기다림 (다른 메시지를 전송할 수 없음)
  • 결과적으로 성능이 크게 낮아지기 때문에 동기적 전송은 실제 서비스(혹은 애플리케이션)에서 사용하지 않는 경우가 많음
1
2
3
4
5
6
val record = ProducerRecord<String, String>("io.customer.country", "Precision Products", "France")
try {
  producer.send(record).get() // get() 호출 시 Future 의 결과를 기다림 - 1
} catch (e: Exception) {
  e.printStacktrace() // - 2
}
  1. 카프카로부터 응답이 올 때까지 대기하기 위해 Future.get() 함수를 사용
  2. 레코드가 카프카로 성공적으로 전송되지 않으면 예외를 발생
  3. 예외가 발생하지 않으면 RecordMetadata 객체를 리턴
  4. 카프카에 메시지를 전송하기 전이나 전송하는 도중에 에러가 발생하는 경우 예외가 발생
  • KafkaProducer 에서 발생하는 에러
    • 재시도 가능한 에러
      • 메시지를 다시 전송함으로써 해결되는 에러를 의미
        • 연결 에러 : 연결이 회복되면 해결
        • 메시지를 전송받은 브로커가 해당 파티션의 리더가 아닐 경우 : 해당 파티션에 새 리더가 선출되고 클라이언트 메타데이터가 업데이트 되면 해결
      • 위와 같은 에러가 발생했을 때 자동으로 재시도하도록 KafkaProducer 를 설정할 수 있음
        • 이 경우 재전송 횟수가 소진되고서도 에러가 해결되지 않는 경우에 한해 재시도 가능한 예외 발생
    • 재시도 불가능한 에러
      • 메시지 크기가 너무 클 경우 등
      • 이러한 경우 KafkaProducer 는 재시도를 하지 않고 예외를 발생시킴

3.3.2 비동기적으로 메시지 전송하기

  • 대부분의 카프카 전송의 경우 굳이 응답이 필요 없음
    • 응답으로 RecordMetadata 를 돌려주는데 대부분의 애플리케이션은 이 정보가 필요없기 때문
  • 반대로 메시지 전송이 완전히 실패했을 경우에는 해당 내용을 알 필요가 있음
  • 메시지를 비동기적으로 전송하고도 에러를 처리하는 경우를 위해 KafkaProducerKafkaRecord 를 전송할 때 Callback 을 지정할 수 있음
1
2
3
4
5
6
7
8
9
10
class DemoProducerCallback : Callback { // 1
  override fun onCompletion(metadata: RecordMetadata, e: Exception?) {
    if (e != null) {
      e.printStacktrace() // 2
    }
  }
}

val record = ProducerRecord<String, String>("io.customer.country", "Precision Products", "USA")
producer.send(record, DemoProducerCallback()) // 3
  1. 콜백을 사용하기 위해 org.apache.kafka.clients.producer.Callback 인터페이스를 구현
  2. 인터페이스는 onCompletion() 단 하나의 메소드만 가지고 있음
  3. 카프카가 에러를 응답한다면 onCompletion()Exception 인자에는 null 이 아닌 객체가 전달됨
  4. 해당 로직에서 오류처리를 진행하면 됨
  5. KafkaProducer 가 메시지를 전송할 때 Callback 을 사용하기 위해 Callback 객체를 매개변수로 전달

3.4 프로듀서 설정하기

프로듀서는 3.2 항에서 살펴보았던 필수적인 설정 정보 외에도 굉장히 많은 수의 설정값을 가지고 있다. 대부분 합리적인 기본값을 가지고 있어서 일일히 설정을 할 필요는 없지만, 몇몇 설정값들의 경우 메모리 사용량이나 성능, 신뢰성 등에 상당한 영향을 미친다.

3.4.1. client.id

프로듀서와 해당 프로듀서를 사용하는 애플리케이션을 구분하는 논리적 식별자

  • 임의의 문자열 사용 가능
  • 브로커는 해당 값을 이하의 내용으로 활용
    • 프로듀서가 보내온 메시지를 서로 구분하기 위해 사용
    • 로그 메시지를 출력할 때 사용
    • 성능 메트릭 값을 집계할 때 사용
    • 클라이언트 별로 사용량을 할당할 때 사용
  • 이 값을 잘 선택하는 것은 문제가 발생했을 때 트러블 슈팅을 용이하게 함
    • IP 123.123.123.123 에서 인증 실패가 자주 발생하고 있네?
    • “주문 확인 서비스” 가 인증에 실패하고 있는 듯한데, 로라한테 한 번 봐달라고 해줄래?

3.4.2 acks

프로듀서가 임의의 쓰기 작업이 성공했다고 판별하기 위해 얼마나 많은 파티션 레플리카가 해당 레코드를 받아야하는지 결정하는 설정값

  • 기본값 = 리더가 해당 레코드를 받은 뒤 쓰기 작업이 성공했다고 응답하는 것
  • acks = 0
    • 프로듀서는 메시지가 성공적으로 전달되었다라고 간주하고 브로커의 응답을 기다리지 않음
    • 브로커가 메시지를 받지 못했을 경우, 프로듀서는 해당 상황에 대해 알 방법이 없고, 메시지는 그대로 유실
    • 프로듀서가 서버로부터 응답을 기다리지 않는 만큼 네트워크가 허용하는 한 빠르게 메시지를 보낼 수 있음.
      • 높은 처리량이 필요할 때 이 설정값을 사용하면 됨
  • acks = 1
    • 프로듀서는 리더 레플리카가 메시지를 받는 순간 브로커로부터 성공했다는 응답을 받음
    • 만약 리더에 메시지를 쓸수 없다면? (리더에 크래시가 났는데 새 리더가 선출되기 전 등)
      • 프로듀서는 에러 응답을 받게 되며, 데이터 유실을 피하기 위해 메시지 재전송을 시도하게 됨
    • 하지만 리더에 크래시가 난 상태에서 해당 메시지가 복제가 안 된 채로 새 리더가 선출될 경우에는 여전히 메시지가 유실 될 수 있음
  • acks = all
    • 프로듀서는 메시지가 모두 인-싱크 레플리카 에 전달된 뒤에야 브로커로부터 성공했다는 응답을 받음
    • 가장 안전한 형태
      • 최소 2개 이상의 브로커가 해당 메시지를 가지고 있으며
      • 위 내용은 크래시가 났을 때에도 메시지가 유실되지 않기 때문
    • acks = 1 인 경우 단순히 브로커 하나가 메시지를 받는 것보다 더 기다려야 하기 때문에 지연시간은 더 길어지게 됨
  • 참고
    • 프로듀서의 acks 값을 내려서 신뢰성을 낮추면 그만큼 레코드를 빠르게 보낼 수 있음
    • 즉, 신뢰성과 프로듀서 지연 사이에는 트레이드 오프 관계가 있다는 이야기
    • 그러나 세 가지 acks 값 모두 레코드가 생성되어 컨슈머가 읽을 수 있을 때까지의 시간을 의미하는 종단 지연 (end to end latency) 의 경우 세 값이 모두 똑같음
      • 카프카는 일관성을 유지하기 위해서 모든 인-싱크 레플리카가 복제가 완료된 뒤에야 컨슈머가 레코드를 읽어 갈 수 있게 하기 때문
    • 따라서 프로듀서 지연이 아니라 주로 종단 지연이 고려되어야 한다면?
      • acks 값을 절충해야 할 것은 없음
      • 가장 신뢰성 있는 설정을 택하더라도 종단 지연은 항상 똑같기 때문

3.4.2 메시지 전달 시간

send() 를 호출했을 때 성공 혹은 실패하기까지 얼마나 시간이 걸리는가?

  • 이 시간은 카프카가 성공적으로 응답을 내려보내 줄 때까지 사용자가 기다릴 수 있는 시간
  • 요청 실패를 인정하고 포기할 때까지 기다릴 수 있는 시간이기도 함

2.1 버전부터 개발진은 ProducerRecord를 보낼 때 걸리는 시간을 두 구간으로 나누어 따로 처리할 수 있도록 함

  • send() 에 대한 비동기 호출이 이뤄진 시간부터 결과를 리턴할 때까지 걸리는 시간
    • 이 시간동안 send()를 호출한 스레드는 블록됨
  • send() 에 대한 비동기 호출이 성공적으로 리턴한 시각부터 (성공/실패에 관계없이) 콜백이 호출될 때까지 걸리는 시간
    • 이 시간은 이하의 시간과 동일
      • ProducerRecord 가 전송을 위해 배치에 추가된 시점에서부터 카프카가 성공 응답을 보내거나
      • 재시도 불가능한 실패가 일어나거나
      • 아니면 전송을 위해 할당된 시간이 소진될 때까지의 시간
  • 참고
    • send() 를 동기적으로 호출할 경우, 메시지를 보내는 스레드는 두 구간에 대해 연속적으로 블록되기 때문에 각각의 구간이 어느정도 걸렸는지 알 수 없음
  • 카프카 프로듀서 내부에서의 메시지 전달 시간을 작업별로 나눈 개념도 (그림 3-2, p60)
    1. max.block.ms
    2. 아래의 경우에 대해 얼마나 오랫동안 블록되는지 결정하는 매개변수
    3. send() 를 호출했을 때
    4. partitionsFor() 를 호출해서 명시적으로 메타데이터를 요청했을 때
    5. send() 함수는 프로듀서의 전송 버퍼가 가득 차거나 메타데이터가 아직 사용 가능하지 않을 때 블록됨
    6. 이 상태에서 max.block.ms 만큼 시간이 흐르면 예외가 발생한다.
    7. delivery.timeout.ms
    8. 이 설정은 레코드 전송 준비가 완료된 시점 (send() 가 문제없이 리턴되고 레코드가 배치에 저장된 시점) 에서부터 브로커의 응답을 받거나 전송을 포기하게 되는 시점까지의 제한시간을 결정.
    9. (그림 3-2 참고) linger.ms 보다 커야 함
    10. 위 제한 조건을 벗어난 설정으로 KafkaProducer 를 생성하면 예외 발생
    11. 메시지는 delivery.timeout.ms 보다 빨리 전송될 수 있으며 실제로도 그렇다
    12. 만약 프로듀서가 재시도를 하는 도중에 delivery.timeout.ms 를 넘어간다면?
    13. 마지막으로 재시도 하기 전에 브로커가 리턴한 에러에 해당하는 예외와 함께 콜백이 호출됨
    14. 레코드 배치가 전송을 기다리는 와중에 delivery.timeout.ms 를 넘어간다면?
    15. 타임아웃 예외와 함께 콜백이 호출됨.
    16. 참고
    17. 사용자 입장에서 메시지 전송에 기다릴 수 있는 만큼 delivery.timeout.ms 값을 최대값으로 설정할 수도 있다.
    18. delivery.timeout.ms 값을 몇 분 정도로 설정하고 retries 의 기본값(사실상 무한으로 두는 것)을 그대로 둔다면 프로듀서는 재시도할 시간이 고갈될 때까지 (혹은 전송이 성공할 떄까지) 계속해서 재전송을 할 것
    19. 위 방식은 재시도 관련 설정을 할 때 훨씬 더 합리적인 방법
    20. 참고2
    21. 재시도 관련 설정을 튜닝하는 일반적인 방식?
    22. 브로커가 크래시났을 때 리더 선출에 대략 30초가 걸리므로 재시도 한도를 안전하게 120초로 유지하자
    23. 위와 같이 머리속으로 생각한 것을 재시도 횟수와 재시도 사이의 시간 간격으로 옮기려고 하는 대신에 그냥 delivery.timeout.ms 를 120초로 설정하면 되는 것
    24. request.timeout.ms
    25. 프로듀서가 데이터를 전송할 때 서버로부터 응답을 받기 위해 얼마나 기다릴 것인지 결정
    26. 각각의 쓰기 요청 후 전송을 포기하기까지의 대기하는 시간
    27. 이 시간은 재시도 시간, 실제 전송 이전에 소요되는 시간 등을 포함하지 않음
    28. 응답없이 타임아웃이 발생할 경우 프로듀서는 재전송을 시도하거나 아니면 TimeoutException 과 함께 콜백을 호출
    29. retries, retry.backoff.ms
    30. 프로듀서가 서버로부터 에러 메시지를 받았을 때 이것이 일시적인 에러 (파티션에 리더가 없는 경우) 일 수도 있음
    31. retries 매개변수는 프로듀서가 메시지 전송을 포기하고 에러를 발생시킬 때까지 메시지를 재전송하는 횟수를 결정한다.
    32. 기본적으로 프로듀서는 각각의 재시도 사이에 100ms 동안 대기
    33. retry.backoff.ms 값을 변경해서 이 간격을 조정 가능
    34. 현재 버전의 카프카에서 이 값들을 조정하는 것을 권장하지 않음
    35. 크래시가 난 브로커가 정상으로 돌아오기까지 (즉, 모든 파티션에 대해 새리더가 선출되는데 얼마나 시간이 걸리는지) 의 시간을 테스트 한 뒤 delivery.timeout.ms 매개변수를 잡는 것을 권장
    36. 프로듀서는 모든 에러를 재전송하는 것은 아님
    37. 어떤 에러는 일시적인 에러가 아니기 떄문에 재시도의 대상이 아님
    38. 일반적으로 프로듀서가 알아서 재전송을 처리해주기 때문에 애플리케이션 코드에서는 관련 처리를 수행하는 코드가 필요 없음
    39. 개발자는 재시도 불가능한 에러를 처리하거나 재시도 횟수가 고갈되었을 경우에 대한 처리에만 집중하면 됨
    40. linger.ms
    41. 현 배치를 전송하기 전까지 대기하는 시간을 결정
    42. KafkaProducer 는 현재 배치가 다 차거나, linger.ms 에 설정된 제한 시간이 되었을 때 메시지 배치를 전송
    43. 프로듀서는 메시지 전송에 사용할 수 있는 스레드가 있을 때 곧바로 전송하도록 되어 있음
    44. linger.ms > 0 으로 설정할 경우 프로듀서가 브로커에 메시지 배치를 전송하기 전에 메시지를 추가할 수 있도록 몇 ms 가량 더 기다리도록 할 수 있다.
    45. 이는 지연을 조금 증가시키는 대신에 throughput 을 크게 증가시킨다
    46. 단위 메시지당 추가적으로 드는 시간은 매우 작지만 압축이 되어 있거나 할 경우 훨씬 더 효율적이기 때문
    47. buffer.memory
    48. 메시지를 전송하기 전에 메시지를 대기시키는 버퍼의 크기 (메모리의 양) 을 결정
    49. 애플리케이션이 서버에 전달 가능한 속도보다 더 빠르게 메시지를 전송한다면 버퍼 메모리가 가득 찰 수 있음
    50. 이 경우 추가로 호출되는 send()max.block.ms 동안 블록되어 버퍼 메모리에 공간이 생길 때 까지 기다림
    51. 해당 시간동안 대기를 수행하고도 버퍼 메모리의 공간이 확보되지 않으면 예외를 발생시킴
    52. 대부분의 프로듀서 예외와는 달리 이 타임아웃은 send() 함수에서 발생하지, send() 함수에서 리턴하는 Future 객체에서 발생하지 않음
    53. compression.type
    54. 메시지를 압축하여 브로커로 전송하고 싶을 때 설정
    55. snappy, gzip, lz4, zstd 중 하나로 설정하여 해당 압축 알고리즘으로 메시지를 압축하여 브로커로 전송
    56. 네트워크 사용량(카프카로 메시지를 전송할 때 병목 구간)과 저장 공간을 절약할 수 있음
    57. batch.size
    58. 같은 파티션에 다수의 레코드가 전송될 경우 프로듀서는 이것들을 배치 단위로 모아서 한꺼번에 전송
    59. 각각의 배치에 사용할 메모리 양을 결정 (개수가 아니라 바이트 단위)
    60. 배치가 가득차면 해당 배치에 들어있는 모든 메시지가 한꺼번에 전송
    61. 그러나 프로듀서가 각각의 배치가 가득 찰 떄까지 기다린다는 의미는 아님
    62. 프로듀서는 하나의 메시지만 들어있는 배치도 전송함
    63. 따라서 이 매개변수를 큰 값으로 유지한다고 해서 메시지 전송에 지연이 발생하지는 않음
    64. 반대로 지나치게 작게 설정할 경우 프로듀서가 지나치게 자주 메시지를 전송해야하므로 약간의 오버헤드가 발생
    65. max.in.flight.request.per.connection
    66. 프로듀서가 서버로부터 응답을 받지 못한 상태에서 전송할 수 있는 최대 메시지의 수를 결정
    67. 이 값을 올리게 되면 메모리 사용량이 증가하지만 처리량도 증가
    68. 단일 데이터센터에서 이 값이 2일 때 처리량이 최대를 기록
    69. 기본값이 5를 사용하더라도 비슷한 성능을 보여줌
    70. 참고 : 순서보장
    71. 카프카는 파티션 내에서 메시지의 순서를 보존하게 되어 있음
    72. 특정 순서로 메시지를 보낼경우 브로커가 받아서 파티션에 쓸 때나 컨슈머가 읽어올 떄 해당 순서대로 처리된다는 것
    73. 특정 상황에서는 순서가 매우 중요한 경우가 있음
    74. retries > 0 으로 설정한 상태에서 max.in.flight.request.per.connection > 1 로 설정할 경우 메시지의 순서가 뒤짚어질 수 있음
    75. 브로커가 첫번째 배치를 받아서 쓰려다 실패했는데, 두번째 배치를 쓸 때는 성공한 상황 (두 번째 배치가 in-flight 상태) 에서 다시 첫 번째 배치를 재전송되어 성공한 경우가 이 경우
    76. 성능상의 고려 때문에 in-flight 요청이 최소 2 이상은 되어야 한다는 점 그리고 신뢰성을 보장하기 위해서 재시도 횟수 또한 높아야 한다는 점을 감안하면, 가장 합당한 선택은 enable.idempotence=true 로 설정하는 것
    77. 이 설정은 최대 5 개의 in-flight 요청을 허용하면서도 순서를 보장하고, 재전송이 발생하더라도 중복이 발생하는 것을 방지해 줌
    78. 멱등적 프로듀서 (8장에서 설명)
    79. enable.idempotence
    80. 0.11 버전부터 카프카는 ‘exactly once’ 를 지원하기 시작
    81. 멱등적 프로듀서는 그중에서도 매우 강력한 부분
    82. 신뢰성을 최대화하는 방향으로 프로듀서를 설정했다고 가정
    83. acks=all
    84. delivery.timeout.ms 를 큰 값으로 설정
    85. 이 경우 프로듀서는 at least once (최소한 한 번) 카프카에 메시지를 쓰게 됨 (중복 발생 가능)
    86. 브로커가 프로듀서로부터 레코드를 받아서 로컬 디스크에 쓰고, 다른 브로커에도 복제가 되었다고 가정
    87. 여기서 첫 번째 브로커가 프로듀서로 응답을 보내기 전에 크래시가 났음
    88. 프로듀서는 request.timeout.ms 만큼 기다렸다가 재전송 시도
    89. 이 때 새로 보내진 메시지는 (이미 기존에 쓰기 작업이 성공적으로 복제되었으므로) 이미 메시지를 받은 바 있는 새 리더 브로커로 전달되게 됨
    90. 결국 중복 저장이 가능
    91. enable.idempotence 설정을 true 로 잡는 것은 바로 이러한 사태를 방지하기 때문
    92. 위 기능이 활성화되면 프로듀서는 레코드를 보낼 때마다 순차적인 번호를 붙여서 보냄
    93. 브로커가 동일한 번호를 가진 레코드를 2개 이상 받을 경우 하나만 저장
    94. 프로듀서는 별다른 문제를 발생시키지않는 DupulicateSequenceException 을 받음
    95. 위 기능을 설정하려면 이하로 설정해야 함 (아닐 경우 ConfigException 발생)
    96. max.in.flight.requests.per.connection ≤ 5
    97. retries ≥ 1
    98. acks=all
This post is licensed under CC BY 4.0 by the author.