Kafka

[카프카(Kafka) 어플리케이션 제작 ] #1. 프로듀서

피랑이 2019. 3. 7. 12:16

위의 링크 글에서는 Kafka 개요 및 설치, 명령어를 이용하여 토픽 생성, 토픽 메시지 Publish, Subscribe에 대해서 설명하였다. 


이번에는 Kafka 라이브러리를 이용하여 어플리케이션을 만들어보자.

본인은 Intellij IDE 환경에서 SpringBoot 2.1 플래폼을 기반으로 Maven 빌드를 사용한다. 

 

프로듀서나 컨슈머를 사용하기 위해서는 kafka-clients를 이용하므로 pom.xml에 종속성을 추가한다. 


    
    org.apache.kafka
    kafka-clients
    2.1.0

프로듀서

카프카 프로듀서 내부 동작 및 프로듀서 어플리케이션에서 카프카 큐로 메시지가 전달되는 과정을 알아보자.

프로듀서 내부 동작

메시지 게시(Publish)를 위한 동작 순서
1. 프로듀서 API 호출 ↓
2. Key-Value 형태로 이뤄진 Kafka 구성 정보를 전달 받음 ↓
3. 메시징 데이터를 바이트 배열로 직렬화 ↓
4. 전송할 파티션 결정(옵션) ↓
5. 메시지를 보내기 위해 브로커 결정 ↓
6. API에 쓰기 요청 ↓
7. 응답 수신 (응답이 수신되명 성공으로 간주)
   오류 코드 수신 (예외를 발생시키거나 설정에 따른 재시도 수행)

메시지 Publish와 별개로 카프카 프로듀서가 책임지고 있는 역활에 대해서 정리해본다. 

카프카 프로커 URL 부트스트랩
프로듀서는 카프카 클러스터에 대한 메타데이터를 가져오기 위해 최소 하나의 브로커에 연결한다. 
보통 다수의 브로커에 연결한다. 

데이터 직렬화
브로커에게 메시지를 보내기 전 TCP 기반 데이터 송수신을 위하여 데이터 객체를 바이트 직렬화한다. 

토픽 파티션 결정
일반적으로 메시지 데이터 객체의 키 해시를 이용하여 어떤 파티션으로 데이터가 전송되어야 하는지 결정한다. 
아니면 커스텀하게 로직을 만들어서 파티션을 결정할 수 있도록 사용자 정의 파티션 코드를 작성할 수 있다.

처리 실패/재시도 
프로듀서 API 설정을 통해 재시도 횟수나 예외 처리가 가능하다. 
예외 종류에 따른 데이터 흐름을 결정할 수 있다.

배치 처리 
배치 처리는 입출력 횟수를 줄이고 프로듀서 메모리를 최적화한다.  
보통 메세지 수를 결정하여 처리하며, 전반적인 처리 속도는 배치에서의 메시지 수에 비례해 증가한다. 

프로듀서 어플리케이션 

프로듀서 어플리케이션 작성의 경우 추상화 계층에 메서드를 노출하는 프로듀서 API를 사용한다. 


카프카 프로듀서 API 사용 순서

1. 기본 설정 정보 ↓
2. 프로듀서 객체 생성 ↓
3. 프로듀서 레코드 객체 생성 ↓
4. 사용자 정의 파티션 생성(옵션) ↓
5. 추가 설정 

기본 설정 정보

 bootstrap.servers

  • 브로커 주소 목록
  • hostname:port 형식
  • 한 개 이상 설정 가능 (브로커 접속 실패할 수 있으므로 최소 두개 지정 권장)

 key.serializer

  • 브로커는 메시지를 전달받으면 메시지 키 정보가 어떠한 직렬화가 되어 있다고 가정하므로 어떤 직렬화를 써는지 명시해야함 
  • 카프카는 ByteArraySerializer, StringSerializer, IntegerSerializer 세가지 내장된 클래스 제공

 value.serializer

  • key.serializer와 비슷하며 메시지 값 정보에 대한 직렬화를 명시


프로듀서 객체 및 프로듀서 레코드 객체 생성

토픽에 레코드를 전송하는 ProducerRecord 객체는 토픽이름, 파티션 번호, 타입스탬프, 키, 값 등을 포함한다. 

토픽이름과 메시지 데이터 값은 필수 파라미터이며, 파티션 번호나 타임스탬프, 키, 값 들은 선택 파라미터이다. 

동기메시징은 브로커에서 회신을 RecordMetadata를 보내고, 비동기 메시징은 브로커에서 즉각 응답 처리만 하며, 프로듀서에서는 콜백 인터페이스를 수행하여 처리한다.

동기메시징으로도 잠깐 테스트를 하였는데 메시지 사이즈 100byte 기준으로 1만 TPS정도 나온다. 


사용자 정의 파티션 생성

대부분의 경우 기본 파티션을 사용하는 경우 충분하나, 하나의 키에 대한 데이터의 비중이 매우 큰 경우 별도의 파티션의 할당이 필요할 수 있다. 

예) K라는 키에 전체데이터의 30%가 있는 경우 N파티션을 할당에 다른키가 N파티션에 할당되지 않게 만들어 공간이 소진되거나 속도가 느려지지 않도록 한다. 

카프카는 사용자 정의 파티션을 생성할 수 있도록 파티셔너 인터페이스를 제공한다. 

package com.example.kafkatest;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;
import java.util.Map;

public class CustomPartition implements Partitioner {
    public int partition(
            String topicName,
            Object key,
            byte[] keybBytes,
            Object value,
            byte[] valueBytes,
            Cluster cluster
    )
    {
        List partitionInfos = cluster.partitionsForTopic(topicName);
        int numPartitions = partitionInfos.size();
        //로직여기에
        return 0;
    }

    @Override
    public void close(){}

    @Override
    public void configure(Map map){}
}
추가 설정 정보

 buffer.memory

  • 카프카 브로커로 메시지 전송 전에 사용할 최대 버퍼 메모리 크기
  • 설정 한계에 도달하면 프로듀서는 예외를 발생시키기 전에 max.block.ms 시간 동안 대기
  • batch.size가 크다면 프로듀서에 더 많은 메모리를 할당해야 함
  • request.timeout.ms를 사용해 시간 초과 설정을 적용하는 좋음

 ack

  • 0, 1, all 사용
  • 0 : ack를 기다리지 않음 (fire and forget)
  • 1 : 리더가 메시지를 기록하자마자 ack를 받음
  • all : 리더가 복제하는 팔로워까지 완료된 경우 ack를 받음
  • 카프카(Kafka)의 이해 Replication - leader & follower 참조

 batch.size

  • 크기 많큼 배치 처리를 허용
  • 대량으로 보낼경우 1000이상으로 설정하는 걸 권장
  • 1~10으로 할 경우 메시지 유실이 발생하는데 정확한 부부은 확인이 필요 (유실 테스트까지만 함)

 linger.ms

  • 브로커로 전송전에 프로듀서가 추가 메시지를 기다리는 시간

 compression.type

  •  프로듀서는 압축을 사용하지 않고 메시지를 전송이 Default임
  • GZIP, Snappy, LZA 형식의 압축이 가능하며, 배치 처리가 많을수록 압축 효과가 좋다. 
  • 되도록 사용하길 권장 Logstash에서 압축과 비압축 차이가 컸다

 retres

  • 메시지 전송이 실패했을시 예외를 발생시키기전 재전송 시도 값 

 partitioner.class

  •  사용자 정의 파티션 생성 시 허용

 timeout.ms

  •  프로듀서에서 오류를 보내기 전에 팔로워의 Ack를 리더가 기다리는 시간


프로듀서 전체 코드

package com.example.kafkatest;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Future;

@SpringBootApplication
public class KafkaProducerApplication {
    private static final Logger log = LoggerFactory.getLogger(KafkaTestApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerApplication.class, args);
        publish();
    }

    private static Properties setProducerProperty(){
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers","192.168.100.173:9092");
        producerProps.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("ack","all");
        producerProps.put("retries", 1);
        /*
            배치사이즈가 작으면 대량전송시 유실됨 최소 1000이상 잡는게 효율적
        */
        producerProps.put("batch.size", 1);
        producerProps.put("linger.ms", 1);
        producerProps.put("buffer.memory", 24568545);
        return producerProps;
    }

    private static void publish(){
        KafkaProducer producer = new KafkaProducer(setProducerProperty());
        log.info("Pubulish to topic :\t" + KafkaStatic.DEFAULT_TOPIC);
        log.info("Pubulish Start");
        for (int i=0; i<10; i++  ) {
            ProducerRecord data = new ProducerRecord(KafkaStatic.DEFAULT_TOPIC, i+"-----"+UUID.randomUUID().toString());
            Future recordMetadata = producer.send(data);
        }
        producer.close();
        log.info("Pubulish End");
    }
}
package com.example.kafkatest;

public class KafkaStatic {
    public static final String DEFAULT_TOPIC = "hirang";
}

프로듀서 콘솔 로그

어플리케이션을 구동하면 아래와 같이 Kafka 설정 관련 정보를 볼 수 있다. 

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.3.RELEASE)

2019-03-07 11:21:18.822  INFO 376 --- [           main] c.e.kafkatest.KafkaProducerApplication   : Starting KafkaProducerApplication on ts-hirang with PID 376 (D:\workspace\github\kafka-test\target\classes started by DESKTOP in D:\workspace\github\kafka-test)
2019-03-07 11:21:18.824  INFO 376 --- [           main] c.e.kafkatest.KafkaProducerApplication   : No active profile set, falling back to default profiles: default
2019-03-07 11:21:19.398  INFO 376 --- [           main] c.e.kafkatest.KafkaProducerApplication   : Started KafkaProducerApplication in 0.795 seconds (JVM running for 1.635)
2019-03-07 11:21:19.409  INFO 376 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = 1
	batch.size = 1
	bootstrap.servers = [192.168.100.173:9092]
	buffer.memory = 24568545
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 1
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 1
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2019-03-07 11:21:19.456  WARN 376 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'ack' was supplied but isn't a known config.
2019-03-07 11:21:19.459  INFO 376 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2019-03-07 11:21:19.460  INFO 376 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fa14705e51bd2ce5
2019-03-07 11:21:19.460  INFO 376 --- [           main] c.e.kafkatest.KafkaTestApplication       : Pubulish to topic :	hirang
2019-03-07 11:21:19.461  INFO 376 --- [           main] c.e.kafkatest.KafkaTestApplication       : Pubulish Start
2019-03-07 11:21:19.526  INFO 376 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: 6ejl7iMZQASnPqRCdv13wA
2019-03-07 11:21:19.534  INFO 376 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2019-03-07 11:21:19.577  INFO 376 --- [           main] c.e.kafkatest.KafkaTestApplication       : Pubulish End

실행 결과

프로듀서 모범 사례

카프카 프로듀서의 내부 동작과 일반적인 토픽에 메시지 게시(Publish)하는 방법을 알아보았다.
프로듀서 구성 요소를 정의할 때 좀 더 바람직한 가이드라인에 대해서 알아보자.

데이터 유효성 검증을 하자!

프로듀서 어플리케이션을 제작할 때 데이터 스키마의 정합성, null이 아닌 키항목 등 유효성 검증을 간과하기 쉽다. 

이런 유효성 검증을 하지 않으면 메시지 데이터가 올바르게 파티션에 할당되지 않으므로, 브로커에 대한 부하 분산에 영향을 준다. (순서를 보장해야하는 1파티션이면 상관없을듯...)


예외처리를 하자!

모든 어플리케이션의 예외처리는 상황에 따라 무조건 필요하다. (너무 남발해서도 문제지만..)

프로듀서에서 예외 상황에 따라 적절한 흐름 제어를 해야한다.

예) 게임 로그 데이터의 임시 큐로 Kafka를 사용할 경우 아이템 거래 사기 발생 시 계정이나 기사단 블럭 등 예외 관련 흐름 제어....


재시도 횟수!

카프카 클러스터 연결 타임아웃 예외 발생 시 메시지 손실이 날 수 있으니 재시도 횟수를 설정한다. 


부트스트랩 URL 수!

카프카 브로커 설정은 반드시 두 개 이상으로 설정한다. 

프로덕션 환경에서 최소 카프카 클러스터 구성은 카프카 3노드, 주키퍼 3노드를 사용하기 때문에 세 개를 적어주면 된다. 


잘못된 파티셔닝 방식의 사용 방지!

사용자 정의 파티션을 잘못 사용하게 되면 토픽의 모든 파티션으로 균일하게 분배되지 않을 수 있고, 최적화된 병렬처리가 불가능 하도록 만들 수 있다.  사용자 정의 파티션 사용시 라운드 로빈을 기본으로 넣자.


메시지의 임시 보존!

카프카 클러스터 장애로 인하여 재시도 횟수를 넘었을 시 메시지 소실이 발생할 수 있으므로 메시지를 다시 전송할 있도록 디스크나, 데이터베이스를 이용하자. 

마치며

카프카 프로듀서 내부 동작을 이해하고 간단하게 어플리케이션을 만들어 보았다. 
다음에는 컨슈머 어플리케이션에 대해 만들어본다.