[카프카(Kafka) 어플리케이션 제작 ] #1. 프로듀서
이번에는 Kafka 라이브러리를 이용하여 어플리케이션을 만들어보자.
본인은 Intellij IDE 환경에서 SpringBoot 2.1 플래폼을 기반으로 Maven 빌드를 사용한다.
프로듀서나 컨슈머를 사용하기 위해서는 kafka-clients를 이용하므로 pom.xml에 종속성을 추가한다.
org.apache.kafka kafka-clients 2.1.0
프로듀서
카프카 프로듀서 내부 동작 및 프로듀서 어플리케이션에서 카프카 큐로 메시지가 전달되는 과정을 알아보자.
프로듀서 내부 동작
프로듀서 어플리케이션
프로듀서 어플리케이션 작성의 경우 추상화 계층에 메서드를 노출하는 프로듀서 API를 사용한다.
카프카 프로듀서 API 사용 순서
bootstrap.servers |
|
key.serializer |
|
value.serializer |
|
프로듀서 객체 및 프로듀서 레코드 객체 생성
토픽에 레코드를 전송하는 ProducerRecord 객체는 토픽이름, 파티션 번호, 타입스탬프, 키, 값 등을 포함한다.
토픽이름과 메시지 데이터 값은 필수 파라미터이며, 파티션 번호나 타임스탬프, 키, 값 들은 선택 파라미터이다.
동기메시징은 브로커에서 회신을 RecordMetadata를 보내고, 비동기 메시징은 브로커에서 즉각 응답 처리만 하며, 프로듀서에서는 콜백 인터페이스를 수행하여 처리한다.
동기메시징으로도 잠깐 테스트를 하였는데 메시지 사이즈 100byte 기준으로 1만 TPS정도 나온다.
사용자 정의 파티션 생성
대부분의 경우 기본 파티션을 사용하는 경우 충분하나, 하나의 키에 대한 데이터의 비중이 매우 큰 경우 별도의 파티션의 할당이 필요할 수 있다.
예) K라는 키에 전체데이터의 30%가 있는 경우 N파티션을 할당에 다른키가 N파티션에 할당되지 않게 만들어 공간이 소진되거나 속도가 느려지지 않도록 한다.
카프카는 사용자 정의 파티션을 생성할 수 있도록 파티셔너 인터페이스를 제공한다.
package com.example.kafkatest; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import java.util.List; import java.util.Map; public class CustomPartition implements Partitioner { public int partition( String topicName, Object key, byte[] keybBytes, Object value, byte[] valueBytes, Cluster cluster ) { ListpartitionInfos = cluster.partitionsForTopic(topicName); int numPartitions = partitionInfos.size(); //로직여기에 return 0; } @Override public void close(){} @Override public void configure(Map map){} }
buffer.memory |
|
ack |
|
batch.size |
|
linger.ms |
|
compression.type |
|
retres |
|
partitioner.class |
|
timeout.ms |
|
프로듀서 전체 코드
package com.example.kafkatest; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import java.util.Properties; import java.util.UUID; import java.util.concurrent.Future; @SpringBootApplication public class KafkaProducerApplication { private static final Logger log = LoggerFactory.getLogger(KafkaTestApplication.class); public static void main(String[] args) { SpringApplication.run(KafkaProducerApplication.class, args); publish(); } private static Properties setProducerProperty(){ Properties producerProps = new Properties(); producerProps.put("bootstrap.servers","192.168.100.173:9092"); producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put("ack","all"); producerProps.put("retries", 1); /* 배치사이즈가 작으면 대량전송시 유실됨 최소 1000이상 잡는게 효율적 */ producerProps.put("batch.size", 1); producerProps.put("linger.ms", 1); producerProps.put("buffer.memory", 24568545); return producerProps; } private static void publish(){ KafkaProducerproducer = new KafkaProducer(setProducerProperty()); log.info("Pubulish to topic :\t" + KafkaStatic.DEFAULT_TOPIC); log.info("Pubulish Start"); for (int i=0; i<10; i++ ) { ProducerRecord data = new ProducerRecord (KafkaStatic.DEFAULT_TOPIC, i+"-----"+UUID.randomUUID().toString()); Future recordMetadata = producer.send(data); } producer.close(); log.info("Pubulish End"); } }
package com.example.kafkatest; public class KafkaStatic { public static final String DEFAULT_TOPIC = "hirang"; }
프로듀서 콘솔 로그
어플리케이션을 구동하면 아래와 같이 Kafka 설정 관련 정보를 볼 수 있다.
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.1.3.RELEASE) 2019-03-07 11:21:18.822 INFO 376 --- [ main] c.e.kafkatest.KafkaProducerApplication : Starting KafkaProducerApplication on ts-hirang with PID 376 (D:\workspace\github\kafka-test\target\classes started by DESKTOP in D:\workspace\github\kafka-test) 2019-03-07 11:21:18.824 INFO 376 --- [ main] c.e.kafkatest.KafkaProducerApplication : No active profile set, falling back to default profiles: default 2019-03-07 11:21:19.398 INFO 376 --- [ main] c.e.kafkatest.KafkaProducerApplication : Started KafkaProducerApplication in 0.795 seconds (JVM running for 1.635) 2019-03-07 11:21:19.409 INFO 376 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: acks = 1 batch.size = 1 bootstrap.servers = [192.168.100.173:9092] buffer.memory = 24568545 client.id = compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 1 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 1 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer 2019-03-07 11:21:19.456 WARN 376 --- [ main] o.a.k.clients.producer.ProducerConfig : The configuration 'ack' was supplied but isn't a known config. 2019-03-07 11:21:19.459 INFO 376 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 2.0.1 2019-03-07 11:21:19.460 INFO 376 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : fa14705e51bd2ce5 2019-03-07 11:21:19.460 INFO 376 --- [ main] c.e.kafkatest.KafkaTestApplication : Pubulish to topic : hirang 2019-03-07 11:21:19.461 INFO 376 --- [ main] c.e.kafkatest.KafkaTestApplication : Pubulish Start 2019-03-07 11:21:19.526 INFO 376 --- [ad | producer-1] org.apache.kafka.clients.Metadata : Cluster ID: 6ejl7iMZQASnPqRCdv13wA 2019-03-07 11:21:19.534 INFO 376 --- [ main] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 2019-03-07 11:21:19.577 INFO 376 --- [ main] c.e.kafkatest.KafkaTestApplication : Pubulish End
실행 결과
프로듀서 모범 사례
데이터 유효성 검증을 하자!
프로듀서 어플리케이션을 제작할 때 데이터 스키마의 정합성, null이 아닌 키항목 등 유효성 검증을 간과하기 쉽다.
이런 유효성 검증을 하지 않으면 메시지 데이터가 올바르게 파티션에 할당되지 않으므로, 브로커에 대한 부하 분산에 영향을 준다. (순서를 보장해야하는 1파티션이면 상관없을듯...)
예외처리를 하자!
모든 어플리케이션의 예외처리는 상황에 따라 무조건 필요하다. (너무 남발해서도 문제지만..)
프로듀서에서 예외 상황에 따라 적절한 흐름 제어를 해야한다.
예) 게임 로그 데이터의 임시 큐로 Kafka를 사용할 경우 아이템 거래 사기 발생 시 계정이나 기사단 블럭 등 예외 관련 흐름 제어....
재시도 횟수!
카프카 클러스터 연결 타임아웃 예외 발생 시 메시지 손실이 날 수 있으니 재시도 횟수를 설정한다.
부트스트랩 URL 수!
카프카 브로커 설정은 반드시 두 개 이상으로 설정한다.
프로덕션 환경에서 최소 카프카 클러스터 구성은 카프카 3노드, 주키퍼 3노드를 사용하기 때문에 세 개를 적어주면 된다.
잘못된 파티셔닝 방식의 사용 방지!
사용자 정의 파티션을 잘못 사용하게 되면 토픽의 모든 파티션으로 균일하게 분배되지 않을 수 있고, 최적화된 병렬처리가 불가능 하도록 만들 수 있다. 사용자 정의 파티션 사용시 라운드 로빈을 기본으로 넣자.
메시지의 임시 보존!
카프카 클러스터 장애로 인하여 재시도 횟수를 넘었을 시 메시지 소실이 발생할 수 있으므로 메시지를 다시 전송할 있도록 디스크나, 데이터베이스를 이용하자.