Infra/Kafka

Apache Kafka (3) - Java

코드파고 2024. 11. 17. 01:35

Apache Kafka Series - Learn Apache Kafka for Beginners v3를 수강하며 기록한 내용입니다.

Settings

jdk 11 필요

gradle dependency 추가

  implementation 'org.apache.kafka:kafka-clients:3.1.0'

Produce

public static void main(String[] args) {  
    log.info("Hello World!");  
    Properties properties = new Properties();  
    properties.setProperty("bootstrap.servers", "127.0.0.1:9092");  

    properties.setProperty("key.serializer", StringSerializer.class.getName());  
    properties.setProperty("value.serializer", StringSerializer.class.getName());  

    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);  

    // topic : first_topic, value : first_value 레코드 발행  
    ProducerRecord<String, String> record = new ProducerRecord<>("first_topic", "first_value");  
    producer.send(record);  

    // 모든 메시지 produce 전까지 block
    producer.flush();  

    // Close Producer  
    producer.close();  
}

Consumer로 메시지 확인 (CLI)

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --group my_second_application --from-beginning

Callback


// topic : first_topic, value : first_value 레코드 발행  
ProducerRecord<String, String> record = new ProducerRecord<>("first_topic", "new_value");  
// 수신, 혹은 예외 발생 시 실행  
producer.send(record, (md, e) -> {  
    if (e == null) {  
        log.info(  
                "Received New Metadata\n" +  
                        "Topic : {}\n" +  
                        "Partition : {}\n" +  
                        "Offset : {}\n" +  
                        "TimeStamp : {}"  
                ,  
                md.topic(),  
                md.partition(),  
                md.offset(),  
                md.timestamp()  
        );    } else {  
        log.error("Error While Producing : {}", e);  
    }});

Sticky Partitioning

  • 파티션에 메시지를 묶음(batch)로 보내는 기법
  • 묶음 단위로 메시지를 전송한 후 다른 파티션으로 이동
  • default partitioner로 지정되어 있다.

Batch size 조정

properties.setProperty("batch.size", "400");

메시지에 키를 포함하여 전송

key를 설정하지 않을 시, default라면 라운드 로빈 방식으로 모든 파티션에 분산된다. 메시지 순서 보장이 어렵고, 데이터 처리에 일관성이 떨어질 수 있다.

String topic = "first_topic";  
String key = "key" + i;  
String value = "new_value" + i;  

ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

해당 메시지를 produce할 때, 키에 맞는 파티션으로 메시지가 분류되어 전송된다.

public static void main(String[] args) {  
    log.info("I am Consumer!");  

    String consumerGroupId = "first_application";  
    String topic = "first_topic";  

    Properties properties = new Properties();  
    properties.setProperty("bootstrap.servers", "127.0.0.1:9092");  

    // Consumer config  
    properties.setProperty("key.deserializer", StringDeserializer.class.getName());  
    properties.setProperty("value.deserializer", StringDeserializer.class.getName());  
    properties.setProperty("group.id", consumerGroupId);  

    /**  
     * 토픽을 어디서부터 읽을 건지에 대한 설정값  
     * none : consumer group이 설정되어 있지 않으면 실패  
     * earliest : topic의 처음 부터 읽는다(cli의 --from-beginning)  
     * latest : 현재부터 읽는다  
     */  
    properties.setProperty("auto.offset.reset", "earliest");  

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);  

    //topic 구독  
    consumer.subscribe(Arrays.asList("first_topic", "second_topic"));  

    // 데이터 폴링  
    while (true) {  
        log.info("Polling");  
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));  
        for (ConsumerRecord<String, String> record : records) {  
            log.info("Key : {}, Value : {}", record.key(), record.value());  
            log.info("Partition : {}, Offset : {}", record.partition(), record.offset());  
        }
    }
}

Consumer Graceful Shutdown

  • 만일 Consumer 그룹 내 Consumer가 Shutdown 된다면, 남은 Consumer들이 파티션을 할당받아야 한다..
    • ex) group 1 : 0, 1, 2 & group 2 : 3,4 를 맡는다면 group 2 종료시 group 1이 3,4를 할당받음
Runtime.getRuntime().addShutdownHook(new Thread(() -> {  
    log.info("Shutdown Detected. Exit by Calling consumer.wakeup() ...");  
    consumer.wakeup();  
    // 메인 쓰레드가 코드를 실행할 수 있도록 조인시킴  
    try {  
        mainThread.join();  
    } catch (InterruptedException e) {  
        e.printStackTrace();  
    }}));  

try {  
    //topic 구독  
    consumer.subscribe(Arrays.asList(topic));  
    // 데이터 폴링  
    while (true) {  
        log.info("Polling");  
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));  
        for (ConsumerRecord<String, String> record : records) {  
            log.info("Key : {}, Value : {}", record.key(), record.value());  
            log.info("Partition : {}, Offset : {}", record.partition(), record.offset());  
        }    
    }
} catch (WakeupException e) {  
    log.info("Consumer is Starting to ShutDown..");  
} catch (Exception e) {  
    log.error("Unexcepted exception in the consumer", e);  
} finally {  
    consumer.close();  
    log.info("The consuer is now gracefully Shutdown");  
}

Consumer Rebalancing

Partition이 추가되거나, Group 내 Consumer가 추가될 경우 Consumer의 파티션이 재할당될 수 있다.
재할당 종류들은 다음과 같다

Eager Rebalance

default 전략
모든 Consumer가 Partition에서 관계를 끊고 Consume을 중단 (이를 "stop-the-world"라 칭함)
파티션을 랜덤하게 재할당받는다
하지만 이전에 연결된 partition에 연결되리라는 보장도 없고, consume도 중단해야 해 코스트가 크다.

Cooperative Rebalance

파티션의 부분집합을 만들어 할당해준다.
consuming을 멈출 필요도 없다.

Setting

partition.assignment.strategy 를 설정해주자

RangeAssignor

partition:topic = 1:1로 설정 (불균형 초래 가능)

RoundRobin

라운드 로빈 방식으로 모든 파티션을 Consumer에 균등하게 배분

StickyAssignor

라운드로빈처럼 균등하게 분배되나, 변경이 생길 시 파티션 이동을 최소화한다.

default는 (RangeAssignor, CooperativeStickyAssignor)이다.
RangeAssignor가 빠지면 CooperativeStickyAssignor 가 대신 동작하는 형태

Static Group Membership

기본적으로 컨슈머가 그룹을 떠나면 파티션이 재할당되어야 한다.
그러나group.instance.id를 사용하면,

  • 고정 멤버가 되고
  • 타임아웃 이전에 다시 그룹에 복구하면 Rebalancing을 유발하지 않는다.
  • properties.setProperty("group.instance.id", "value"); // for static assignments

위와 같이 설정 가능하다.

Auto Offset Commit Behavior

Consumer가 읽은 오프셋에 대해 자동 커밋해 주는 기능

enable.auto.commit=true
auto.commit.interval.ms=5000

위 설정을 통해 활성화할 수 있다.