[카프카(Kafka) 어플리케이션 제작 ] #2. 컨슈머
[카프카(Kafka) 어플리케이션 제작 ] #1. 프로듀서
[카프카(Kafka) 어플리케이션 제작 ] #2. 컨슈머
이전 글에서는 프로듀서 내부 동작 확인 및 어플리케이션을 제작하였다.
이번에는 컨슈머 어플리케이션을 제작해본다.
컨슈머
컨슈머 내부 동작
토픽 구독 |
|
오프셋 위치 |
|
재연/되감기/메시지 스킵 |
|
하트비트 |
|
오프셋 커밋 |
|
역직렬화 |
|
메시지 수신(Subscribe)을 위한 동작 순서
컨슈머 어플리케이션
bootstrap.servers |
|
key.deserializer |
|
value.deserializer |
|
group.id |
|
Properties producerProps = new Properties(); producerProps.put("bootstrap.servers","192.168.100.173:9092"); producerProps.put("group.id", "hirang_test02"); producerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); producerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return producerProps; KafkaConsumerconsumer = new KafkaConsumer(setConsumerProperty());
구독
컨슈머는 메시지 데이터를 받기 위해 토픽을 구독한다. (KafkaConsumer.subscribe 사용)
구독 메소드는 입력 파라미터에 따라 다양한 구독이 가능하다.
subscribe(Collection<String> topics)
: 원하는 토픽 목록을 전달 / 기본 리밸런서 사용
subscribe(Pattern pattern, ConsumerRebalanceListener listener)
: 원하는 토픽을 찾기 위해 정규식을 전달 / 정규식에 대응하는 새로운 토픽의 추가가 삭제 발생시 리밸런서 트리거됨
subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
: 토픽의 목록을 전달 / 리밸런서 트리거됨
오프셋 커밋 처리
오프셋에 대한 커밋이 일어나는 방법은 3가지가 존재하며 각각 특성이 존재한다.
자동커밋 |
|
현재 오프셋 커밋 |
|
비동기 커밋 |
|
추가 설정
설정 작업 이전에 데이터를 처리하기 위해 컨슈머가 얼마의 시간이 필요한지 확인 및 테스트가 필요하다.
enable.auto.commit |
|
fetch.min.bytes |
|
request.timeout.ms |
|
auto.offset.reset |
lastest : 파티션에서 가장 최근 메시지부터 시작 earliest : 파티션 맨 처음부터 시작 none : 예외가 발생됨 |
session.timeout.ms |
|
max.partition.fetch.bytes |
|
package com.example.kafkatest; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.Future; @SpringBootApplication public class KafkaConsumerApplication { private static final Logger log = LoggerFactory.getLogger(KafkaTestApplication.class); public static void main(String[] args) { SpringApplication.run(KafkaConsumerApplication.class, args); subscribe(); } private static Properties setConsumerProperty(){ Properties producerProps = new Properties(); producerProps.put("bootstrap.servers","192.168.100.173:9092"); producerProps.put("group.id", "hirang_test02"); producerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); producerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); producerProps.put("enable.auto.commit", "true"); producerProps.put("auto.commit.interval.ms", "1000"); producerProps.put("session.timeout.ms", "30000"); return producerProps; } private static void subscribe(){ String topic = "hirang"; ListtopicList = new ArrayList<>(); topicList.add(topic); KafkaConsumer consumer = new KafkaConsumer(setConsumerProperty()); consumer.subscribe(topicList); log.info("Subscribed to topic :\t" + KafkaStatic.DEFAULT_TOPIC); int i = 0; try { while (true) { ConsumerRecords records = consumer.poll(500); for (ConsumerRecord record : records) log.info("offset = " + record.offset() + "\tkey =" + record.key() + "\tvalue =" + record.value()); //TODO : Do processing for data here consumer.commitAsync(new OffsetCommitCallback() { public void onComplete(Map map, Exception e) { } }); } } catch (Exception ex) { //TODO : Log Exception Here } finally { try { consumer.commitSync(); } finally { consumer.close(); } } } }
컨슈머 콘솔 로그
읽은 메시지 데이터를 로그로 확인하였다.
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.1.3.RELEASE) 2019-03-08 11:30:52.369 INFO 4556 --- [ main] c.e.kafkatest.KafkaConsumerApplication : Starting KafkaConsumerApplication on ts-hirang with PID 4556 (D:\workspace\github\kafka-test\target\classes started by DESKTOP in D:\workspace\github\kafka-test) 2019-03-08 11:30:52.379 INFO 4556 --- [ main] c.e.kafkatest.KafkaConsumerApplication : No active profile set, falling back to default profiles: default 2019-03-08 11:30:53.178 INFO 4556 --- [ main] c.e.kafkatest.KafkaConsumerApplication : Started KafkaConsumerApplication in 1.766 seconds (JVM running for 3.564) 2019-03-08 11:30:53.192 INFO 4556 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: auto.commit.interval.ms = 1000 auto.offset.reset = latest bootstrap.servers = [192.168.100.173:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = hirang_test02 heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 30000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2019-03-08 11:30:53.238 INFO 4556 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.1 2019-03-08 11:30:53.239 INFO 4556 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5 2019-03-08 11:30:53.240 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : Subscribed to topic : hirang 2019-03-08 11:30:53.318 INFO 4556 --- [ main] org.apache.kafka.clients.Metadata : Cluster ID: 6ejl7iMZQASnPqRCdv13wA 2019-03-08 11:30:53.319 INFO 4556 --- [ main] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=hirang_test02] Discovered group coordinator 192.168.100.173:9092 (id: 2147483646 rack: null) 2019-03-08 11:30:53.321 INFO 4556 --- [ main] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=hirang_test02] Revoking previously assigned partitions [] 2019-03-08 11:30:53.321 INFO 4556 --- [ main] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=hirang_test02] (Re-)joining group 2019-03-08 11:30:53.347 INFO 4556 --- [ main] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=hirang_test02] Successfully joined group with generation 12 2019-03-08 11:30:53.348 INFO 4556 --- [ main] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=hirang_test02] Setting newly assigned partitions [hirang-0] 2019-03-08 11:31:03.104 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277494 key =null value =0-----87d9bcc5-3b75-4032-9427-e02b75a9eed7 2019-03-08 11:31:03.107 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277495 key =null value =1-----9c6e8e14-dd8e-4ddf-b810-4cf1f3c80e5c 2019-03-08 11:31:03.108 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277496 key =null value =2-----5f8eac28-06ed-4c53-a0c0-a15721a6ede2 2019-03-08 11:31:03.108 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277497 key =null value =3-----e6dd3658-5bc2-4bdb-ab7b-b127f8ca176a 2019-03-08 11:31:03.108 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277498 key =null value =4-----e077a534-d95e-4aca-adb3-1c2c1a29fce1 2019-03-08 11:31:03.108 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277499 key =null value =5-----5b38908b-ec60-43e8-98cc-76fa66451483 2019-03-08 11:31:03.112 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277500 key =null value =6-----d049b157-9370-433e-9cee-7101918eb449 2019-03-08 11:31:03.112 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277501 key =null value =7-----8786e6a2-c757-4a3f-be59-130d5c393bfc 2019-03-08 11:31:03.116 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277502 key =null value =8-----30b82719-abc5-4e1e-81d1-d8e47e53cb7d 2019-03-08 11:31:03.122 INFO 4556 --- [ main] c.e.kafkatest.KafkaTestApplication : offset = 277503 key =null value =9-----687f0bdb-3280-415f-9fb8-8d5734380755
컨슈머 모범 사례
마치며
카프카의 프로듀서 및 컨슈머에 대한 내부 동작에 대해서 이해하였고 어플리케이션도 만들어 보았다.
프로덕션 환경에 적용할 때 컨슈머 그룹안에서 오프셋 커밋 전략,하트비트, 리벨런스 등 여러 성능 테스트가 필요하다.