Infra/Kafka

Apache Kafka (5) - Consumer Configuration

코드파고 2024. 12. 2. 17:25

Apache Kafka Series - Learn Apache Kafka for Beginners v3를 수강하며 기록한 내용입니다.

enable.auto.commit

  • Java Consumer Api에서 오프셋은 정기적으로 커밋된다
  • 디폴트로 at-least once 전략 사용
  • 오프셋은 .poll()을 호출했을 때와 auto.commit,interval.ms 이 경과되었을 때 커밋된다
    • auto.commit.interval.ms 는 지정하지 않는다면 5000ms
  • poll()을 부르기 전에 메시지가 확실히 처리되도록 보장할 것
    • 그렇지 않은 경우에는 enable.auto.commit을 비활성화하고 대부분의 처리를 별도의 스레드로 옮겨야 함
      그리고 적절한 오프셋을 수동으로 설정하여 .commitSync() 또는 .commitAsync()를 주기적으로 호출해야 한다.

enable.auto.commit=false + 오프셋을 외부에 저장

  • seek() API를 사용하여 컨슈머에게 적합한 파티션을 수동으로 지정해 주어야 함
  • DB 테이블을 설계하고, 오프셋을 저장해야 함
  • Rebalance가 발생하는 케이스를 처리해야 함
  • 만약 Idempotent 프로세싱을 구상하지 못할 경우, 데이터 처리 + 오프셋 커밋을 한 트랜잭션으로 묶어야 함.

auto.offset.reset

초기 오프셋이 없을 때 오프셋을 자동으로 설정하는 기능
latest(가장 최신의 오프셋 소비), earliest(가장 처음의 오프셋), none(오프셋 정보가 없다면 소비하지 않음) 중 선택가능

hearbeat.interval.ms(default 3s)

  • heartbeat란? : 컨슈머가 살아있음을 알려주는 신호이며, 브로커에 전송
  • session.timeout.ms 의 1/3을 권장

session.timeout.ms(default: 3.0 이상 45s, 이전은 10s)

broker로 전송
그 전까지 heartbeat 미전송시, 컨슈머는 다운된 것으로 간주

즉 consumer-application 이 다운되었는지 확인하는 메커니즘으로 쓰이기도 함

max.poll.interval.ms(default : 5 min)

  • 두 poll() 사이의 시간 차 - 선언한 시간이 지나면 컨슈머가 다운되었다고 간주

max.poll.records(default: 500)

하나의 poll() 요청에 얼마나 많은 레코드가 담길지 설정

fetch.min.bytes(default:1)

  • 하나의 리퀘스트에 얼마나 많은 데이터를 가져올 지 설정
  • 처리량과 리퀘스트 횟수에 영향을 미침

fetch.max.wait.ms(deafult:500)

  • 카프카 브로커가 fetch.min.bytes를 충족하지 않을 때 최대 대기 가능한 시간
  • 만일 fetch.min.bytes가 1이라고 가정하면, 1이 차지 않을 동안 500 ms를 대기한다는 뜻
    • 500ms의 레이턴시가 생길 수 있음을 의미

max.partition.fetch.bytes(default:1MB)

  • 파티션 별로 서버가 리턴할 최대 데이터의 크기

fetch.max.bytes(default:55MB)

  • 리퀘스트 당 리턴될 최대 데이터
  • 메모리가 충분하다면 늘려서 컨슈머가 한 리퀘스트 당 많은 데이터를 읽을 수 있도록 조정하자

Consumer Replica

2.4 이후 가장 가까운 레플리카에서 데이터를 소비할 수 있도록 바뀌었다.
만약 브로커가 다른 데이터 센터에 있다면, 가까운 데이터 센터의 데이터를 사용 가능

AWS경우 AZ ID로 설정 (2.4+ 버전)

  • rack.id=usw2-az1
  • replica.selector.class = org.apache.kafka.common.replica.RackAwareReplicaSelector 는 필수 설정

consumer client

client.rack를 컨슈머가 설치된 데이터 센터 ID로 설정