Apache Kafka (3) - Java
Apache Kafka Series - Learn Apache Kafka for Beginners v3를 수강하며 기록한 내용입니다.
Settings
jdk 11 필요
gradle dependency 추가
implementation 'org.apache.kafka:kafka-clients:3.1.0'
Produce
public static void main(String[] args) {
log.info("Hello World!");
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// topic : first_topic, value : first_value 레코드 발행
ProducerRecord<String, String> record = new ProducerRecord<>("first_topic", "first_value");
producer.send(record);
// 모든 메시지 produce 전까지 block
producer.flush();
// Close Producer
producer.close();
}
Consumer로 메시지 확인 (CLI)
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --group my_second_application --from-beginning
Callback
// topic : first_topic, value : first_value 레코드 발행
ProducerRecord<String, String> record = new ProducerRecord<>("first_topic", "new_value");
// 수신, 혹은 예외 발생 시 실행
producer.send(record, (md, e) -> {
if (e == null) {
log.info(
"Received New Metadata\n" +
"Topic : {}\n" +
"Partition : {}\n" +
"Offset : {}\n" +
"TimeStamp : {}"
,
md.topic(),
md.partition(),
md.offset(),
md.timestamp()
); } else {
log.error("Error While Producing : {}", e);
}});
Sticky Partitioning
- 파티션에 메시지를 묶음(batch)로 보내는 기법
- 묶음 단위로 메시지를 전송한 후 다른 파티션으로 이동
- default partitioner로 지정되어 있다.
Batch size 조정
properties.setProperty("batch.size", "400");
메시지에 키를 포함하여 전송
key를 설정하지 않을 시, default라면 라운드 로빈 방식으로 모든 파티션에 분산된다. 메시지 순서 보장이 어렵고, 데이터 처리에 일관성이 떨어질 수 있다.
String topic = "first_topic";
String key = "key" + i;
String value = "new_value" + i;
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
해당 메시지를 produce할 때, 키에 맞는 파티션으로 메시지가 분류되어 전송된다.
public static void main(String[] args) {
log.info("I am Consumer!");
String consumerGroupId = "first_application";
String topic = "first_topic";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// Consumer config
properties.setProperty("key.deserializer", StringDeserializer.class.getName());
properties.setProperty("value.deserializer", StringDeserializer.class.getName());
properties.setProperty("group.id", consumerGroupId);
/**
* 토픽을 어디서부터 읽을 건지에 대한 설정값
* none : consumer group이 설정되어 있지 않으면 실패
* earliest : topic의 처음 부터 읽는다(cli의 --from-beginning)
* latest : 현재부터 읽는다
*/
properties.setProperty("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//topic 구독
consumer.subscribe(Arrays.asList("first_topic", "second_topic"));
// 데이터 폴링
while (true) {
log.info("Polling");
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
log.info("Key : {}, Value : {}", record.key(), record.value());
log.info("Partition : {}, Offset : {}", record.partition(), record.offset());
}
}
}
Consumer Graceful Shutdown
- 만일 Consumer 그룹 내 Consumer가 Shutdown 된다면, 남은 Consumer들이 파티션을 할당받아야 한다..
- ex) group 1 : 0, 1, 2 & group 2 : 3,4 를 맡는다면 group 2 종료시 group 1이 3,4를 할당받음
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("Shutdown Detected. Exit by Calling consumer.wakeup() ...");
consumer.wakeup();
// 메인 쓰레드가 코드를 실행할 수 있도록 조인시킴
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}}));
try {
//topic 구독
consumer.subscribe(Arrays.asList(topic));
// 데이터 폴링
while (true) {
log.info("Polling");
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
log.info("Key : {}, Value : {}", record.key(), record.value());
log.info("Partition : {}, Offset : {}", record.partition(), record.offset());
}
}
} catch (WakeupException e) {
log.info("Consumer is Starting to ShutDown..");
} catch (Exception e) {
log.error("Unexcepted exception in the consumer", e);
} finally {
consumer.close();
log.info("The consuer is now gracefully Shutdown");
}
Consumer Rebalancing
Partition이 추가되거나, Group 내 Consumer가 추가될 경우 Consumer의 파티션이 재할당될 수 있다.
재할당 종류들은 다음과 같다
Eager Rebalance
default 전략
모든 Consumer가 Partition에서 관계를 끊고 Consume을 중단 (이를 "stop-the-world"라 칭함)
파티션을 랜덤하게 재할당받는다
하지만 이전에 연결된 partition에 연결되리라는 보장도 없고, consume도 중단해야 해 코스트가 크다.
Cooperative Rebalance
파티션의 부분집합을 만들어 할당해준다.
consuming을 멈출 필요도 없다.
Setting
partition.assignment.strategy
를 설정해주자
RangeAssignor
partition:topic = 1:1로 설정 (불균형 초래 가능)
RoundRobin
라운드 로빈 방식으로 모든 파티션을 Consumer에 균등하게 배분
StickyAssignor
라운드로빈처럼 균등하게 분배되나, 변경이 생길 시 파티션 이동을 최소화한다.
default는 (RangeAssignor, CooperativeStickyAssignor)이다.
RangeAssignor가 빠지면 CooperativeStickyAssignor 가 대신 동작하는 형태
Static Group Membership
기본적으로 컨슈머가 그룹을 떠나면 파티션이 재할당되어야 한다.
그러나group.instance.id
를 사용하면,
- 고정 멤버가 되고
- 타임아웃 이전에 다시 그룹에 복구하면 Rebalancing을 유발하지 않는다.
properties.setProperty("group.instance.id", "value"); // for static assignments
위와 같이 설정 가능하다.
Auto Offset Commit Behavior
Consumer가 읽은 오프셋에 대해 자동 커밋해 주는 기능
enable.auto.commit=true
auto.commit.interval.ms=5000
위 설정을 통해 활성화할 수 있다.