위의 링크 글에서는 Kafka 개요 및 설치, 명령어를 이용하여 토픽 생성, 토픽 메시지 Publish, Subscribe에 대해서 설명하였다. 


이번에는 Kafka 라이브러리를 이용하여 어플리케이션을 만들어보자.

본인은 Intellij IDE 환경에서 SpringBoot 2.1 플래폼을 기반으로 Maven 빌드를 사용한다. 

 

프로듀서나 컨슈머를 사용하기 위해서는 kafka-clients를 이용하므로 pom.xml에 종속성을 추가한다. 


    
    org.apache.kafka
    kafka-clients
    2.1.0

프로듀서

카프카 프로듀서 내부 동작 및 프로듀서 어플리케이션에서 카프카 큐로 메시지가 전달되는 과정을 알아보자.

프로듀서 내부 동작

메시지 게시(Publish)를 위한 동작 순서
1. 프로듀서 API 호출 ↓
2. Key-Value 형태로 이뤄진 Kafka 구성 정보를 전달 받음 ↓
3. 메시징 데이터를 바이트 배열로 직렬화 ↓
4. 전송할 파티션 결정(옵션) ↓
5. 메시지를 보내기 위해 브로커 결정 ↓
6. API에 쓰기 요청 ↓
7. 응답 수신 (응답이 수신되명 성공으로 간주)
   오류 코드 수신 (예외를 발생시키거나 설정에 따른 재시도 수행)

메시지 Publish와 별개로 카프카 프로듀서가 책임지고 있는 역활에 대해서 정리해본다. 

카프카 프로커 URL 부트스트랩
프로듀서는 카프카 클러스터에 대한 메타데이터를 가져오기 위해 최소 하나의 브로커에 연결한다. 
보통 다수의 브로커에 연결한다. 

데이터 직렬화
브로커에게 메시지를 보내기 전 TCP 기반 데이터 송수신을 위하여 데이터 객체를 바이트 직렬화한다. 

토픽 파티션 결정
일반적으로 메시지 데이터 객체의 키 해시를 이용하여 어떤 파티션으로 데이터가 전송되어야 하는지 결정한다. 
아니면 커스텀하게 로직을 만들어서 파티션을 결정할 수 있도록 사용자 정의 파티션 코드를 작성할 수 있다.

처리 실패/재시도 
프로듀서 API 설정을 통해 재시도 횟수나 예외 처리가 가능하다. 
예외 종류에 따른 데이터 흐름을 결정할 수 있다.

배치 처리 
배치 처리는 입출력 횟수를 줄이고 프로듀서 메모리를 최적화한다.  
보통 메세지 수를 결정하여 처리하며, 전반적인 처리 속도는 배치에서의 메시지 수에 비례해 증가한다. 

프로듀서 어플리케이션 

프로듀서 어플리케이션 작성의 경우 추상화 계층에 메서드를 노출하는 프로듀서 API를 사용한다. 


카프카 프로듀서 API 사용 순서

1. 기본 설정 정보 ↓
2. 프로듀서 객체 생성 ↓
3. 프로듀서 레코드 객체 생성 ↓
4. 사용자 정의 파티션 생성(옵션) ↓
5. 추가 설정 

기본 설정 정보

 bootstrap.servers

  • 브로커 주소 목록
  • hostname:port 형식
  • 한 개 이상 설정 가능 (브로커 접속 실패할 수 있으므로 최소 두개 지정 권장)

 key.serializer

  • 브로커는 메시지를 전달받으면 메시지 키 정보가 어떠한 직렬화가 되어 있다고 가정하므로 어떤 직렬화를 써는지 명시해야함 
  • 카프카는 ByteArraySerializer, StringSerializer, IntegerSerializer 세가지 내장된 클래스 제공

 value.serializer

  • key.serializer와 비슷하며 메시지 값 정보에 대한 직렬화를 명시


프로듀서 객체 및 프로듀서 레코드 객체 생성

토픽에 레코드를 전송하는 ProducerRecord 객체는 토픽이름, 파티션 번호, 타입스탬프, 키, 값 등을 포함한다. 

토픽이름과 메시지 데이터 값은 필수 파라미터이며, 파티션 번호나 타임스탬프, 키, 값 들은 선택 파라미터이다. 

동기메시징은 브로커에서 회신을 RecordMetadata를 보내고, 비동기 메시징은 브로커에서 즉각 응답 처리만 하며, 프로듀서에서는 콜백 인터페이스를 수행하여 처리한다.

동기메시징으로도 잠깐 테스트를 하였는데 메시지 사이즈 100byte 기준으로 1만 TPS정도 나온다. 


사용자 정의 파티션 생성

대부분의 경우 기본 파티션을 사용하는 경우 충분하나, 하나의 키에 대한 데이터의 비중이 매우 큰 경우 별도의 파티션의 할당이 필요할 수 있다. 

예) K라는 키에 전체데이터의 30%가 있는 경우 N파티션을 할당에 다른키가 N파티션에 할당되지 않게 만들어 공간이 소진되거나 속도가 느려지지 않도록 한다. 

카프카는 사용자 정의 파티션을 생성할 수 있도록 파티셔너 인터페이스를 제공한다. 

package com.example.kafkatest;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;
import java.util.Map;

public class CustomPartition implements Partitioner {
    public int partition(
            String topicName,
            Object key,
            byte[] keybBytes,
            Object value,
            byte[] valueBytes,
            Cluster cluster
    )
    {
        List partitionInfos = cluster.partitionsForTopic(topicName);
        int numPartitions = partitionInfos.size();
        //로직여기에
        return 0;
    }

    @Override
    public void close(){}

    @Override
    public void configure(Map map){}
}
추가 설정 정보

 buffer.memory

  • 카프카 브로커로 메시지 전송 전에 사용할 최대 버퍼 메모리 크기
  • 설정 한계에 도달하면 프로듀서는 예외를 발생시키기 전에 max.block.ms 시간 동안 대기
  • batch.size가 크다면 프로듀서에 더 많은 메모리를 할당해야 함
  • request.timeout.ms를 사용해 시간 초과 설정을 적용하는 좋음

 ack

  • 0, 1, all 사용
  • 0 : ack를 기다리지 않음 (fire and forget)
  • 1 : 리더가 메시지를 기록하자마자 ack를 받음
  • all : 리더가 복제하는 팔로워까지 완료된 경우 ack를 받음
  • 카프카(Kafka)의 이해 Replication - leader & follower 참조

 batch.size

  • 크기 많큼 배치 처리를 허용
  • 대량으로 보낼경우 1000이상으로 설정하는 걸 권장
  • 1~10으로 할 경우 메시지 유실이 발생하는데 정확한 부부은 확인이 필요 (유실 테스트까지만 함)

 linger.ms

  • 브로커로 전송전에 프로듀서가 추가 메시지를 기다리는 시간

 compression.type

  •  프로듀서는 압축을 사용하지 않고 메시지를 전송이 Default임
  • GZIP, Snappy, LZA 형식의 압축이 가능하며, 배치 처리가 많을수록 압축 효과가 좋다. 
  • 되도록 사용하길 권장 Logstash에서 압축과 비압축 차이가 컸다

 retres

  • 메시지 전송이 실패했을시 예외를 발생시키기전 재전송 시도 값 

 partitioner.class

  •  사용자 정의 파티션 생성 시 허용

 timeout.ms

  •  프로듀서에서 오류를 보내기 전에 팔로워의 Ack를 리더가 기다리는 시간


프로듀서 전체 코드

package com.example.kafkatest;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Future;

@SpringBootApplication
public class KafkaProducerApplication {
    private static final Logger log = LoggerFactory.getLogger(KafkaTestApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerApplication.class, args);
        publish();
    }

    private static Properties setProducerProperty(){
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers","192.168.100.173:9092");
        producerProps.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("ack","all");
        producerProps.put("retries", 1);
        /*
            배치사이즈가 작으면 대량전송시 유실됨 최소 1000이상 잡는게 효율적
        */
        producerProps.put("batch.size", 1);
        producerProps.put("linger.ms", 1);
        producerProps.put("buffer.memory", 24568545);
        return producerProps;
    }

    private static void publish(){
        KafkaProducer producer = new KafkaProducer(setProducerProperty());
        log.info("Pubulish to topic :\t" + KafkaStatic.DEFAULT_TOPIC);
        log.info("Pubulish Start");
        for (int i=0; i<10; i++  ) {
            ProducerRecord data = new ProducerRecord(KafkaStatic.DEFAULT_TOPIC, i+"-----"+UUID.randomUUID().toString());
            Future recordMetadata = producer.send(data);
        }
        producer.close();
        log.info("Pubulish End");
    }
}
package com.example.kafkatest;

public class KafkaStatic {
    public static final String DEFAULT_TOPIC = "hirang";
}

프로듀서 콘솔 로그

어플리케이션을 구동하면 아래와 같이 Kafka 설정 관련 정보를 볼 수 있다. 

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.3.RELEASE)

2019-03-07 11:21:18.822  INFO 376 --- [           main] c.e.kafkatest.KafkaProducerApplication   : Starting KafkaProducerApplication on ts-hirang with PID 376 (D:\workspace\github\kafka-test\target\classes started by DESKTOP in D:\workspace\github\kafka-test)
2019-03-07 11:21:18.824  INFO 376 --- [           main] c.e.kafkatest.KafkaProducerApplication   : No active profile set, falling back to default profiles: default
2019-03-07 11:21:19.398  INFO 376 --- [           main] c.e.kafkatest.KafkaProducerApplication   : Started KafkaProducerApplication in 0.795 seconds (JVM running for 1.635)
2019-03-07 11:21:19.409  INFO 376 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = 1
	batch.size = 1
	bootstrap.servers = [192.168.100.173:9092]
	buffer.memory = 24568545
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 1
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 1
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2019-03-07 11:21:19.456  WARN 376 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'ack' was supplied but isn't a known config.
2019-03-07 11:21:19.459  INFO 376 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2019-03-07 11:21:19.460  INFO 376 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fa14705e51bd2ce5
2019-03-07 11:21:19.460  INFO 376 --- [           main] c.e.kafkatest.KafkaTestApplication       : Pubulish to topic :	hirang
2019-03-07 11:21:19.461  INFO 376 --- [           main] c.e.kafkatest.KafkaTestApplication       : Pubulish Start
2019-03-07 11:21:19.526  INFO 376 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: 6ejl7iMZQASnPqRCdv13wA
2019-03-07 11:21:19.534  INFO 376 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2019-03-07 11:21:19.577  INFO 376 --- [           main] c.e.kafkatest.KafkaTestApplication       : Pubulish End

실행 결과

프로듀서 모범 사례

카프카 프로듀서의 내부 동작과 일반적인 토픽에 메시지 게시(Publish)하는 방법을 알아보았다.
프로듀서 구성 요소를 정의할 때 좀 더 바람직한 가이드라인에 대해서 알아보자.

데이터 유효성 검증을 하자!

프로듀서 어플리케이션을 제작할 때 데이터 스키마의 정합성, null이 아닌 키항목 등 유효성 검증을 간과하기 쉽다. 

이런 유효성 검증을 하지 않으면 메시지 데이터가 올바르게 파티션에 할당되지 않으므로, 브로커에 대한 부하 분산에 영향을 준다. (순서를 보장해야하는 1파티션이면 상관없을듯...)


예외처리를 하자!

모든 어플리케이션의 예외처리는 상황에 따라 무조건 필요하다. (너무 남발해서도 문제지만..)

프로듀서에서 예외 상황에 따라 적절한 흐름 제어를 해야한다.

예) 게임 로그 데이터의 임시 큐로 Kafka를 사용할 경우 아이템 거래 사기 발생 시 계정이나 기사단 블럭 등 예외 관련 흐름 제어....


재시도 횟수!

카프카 클러스터 연결 타임아웃 예외 발생 시 메시지 손실이 날 수 있으니 재시도 횟수를 설정한다. 


부트스트랩 URL 수!

카프카 브로커 설정은 반드시 두 개 이상으로 설정한다. 

프로덕션 환경에서 최소 카프카 클러스터 구성은 카프카 3노드, 주키퍼 3노드를 사용하기 때문에 세 개를 적어주면 된다. 


잘못된 파티셔닝 방식의 사용 방지!

사용자 정의 파티션을 잘못 사용하게 되면 토픽의 모든 파티션으로 균일하게 분배되지 않을 수 있고, 최적화된 병렬처리가 불가능 하도록 만들 수 있다.  사용자 정의 파티션 사용시 라운드 로빈을 기본으로 넣자.


메시지의 임시 보존!

카프카 클러스터 장애로 인하여 재시도 횟수를 넘었을 시 메시지 소실이 발생할 수 있으므로 메시지를 다시 전송할 있도록 디스크나, 데이터베이스를 이용하자. 

마치며

카프카 프로듀서 내부 동작을 이해하고 간단하게 어플리케이션을 만들어 보았다. 
다음에는 컨슈머 어플리케이션에 대해 만들어본다.


Posted by 사용자 피랑이

댓글을 달아 주세요

  1. AndersonChoi 2019.08.30 16:41 신고  댓글주소  수정/삭제  댓글쓰기

    감사합니다. 잘 정리해주셨네요. 많은도움되었습니다.
    특히 추가 설정 정보설명이 유용했습니다!

카프카(Kafka)의 이해

Kafka 2019. 1. 17. 11:55


대용량 게임로그 수집을 위해 Elastic Stack을 도입하게 되었는데, 중간에 버퍼역할(메시지큐)을 하는 Kafka에 대서 알아보려고 한다.


메시지큐? 

메시지 지향 미들웨어(Message Oriented Middleware: MOM)은 비동기 메시지를 사용하는 다른 응용프로그램 사이의 데이터 송수신을 의미하는데 MOM을 구현한 시스템을 메시지큐(Message Queue:MQ)라 한다.


카프카란? 

분산형 스트리밍 플랫폼(A distributed streaming platform)이다. LinkedIn에서 여러 구직 및 채용 정보들을 한곳에서 처리(발행/구독)할 수 있는 플래폼으로 개발이 시작 되었다고 한다.

(발행/구독: pub-sub은 메시지를 특정 수신자에게 직접적으로 보내주는 시스템이 아니고, 메시지를 받기를 원하는 사람이 해당 토픽(topic)을 구독함으로써 메시지를 읽어 올 수 있다.)


카프카의 특징 

대용량 실시간 로그처리에 특화되어 설계된 메시징 시스템으로 TPS가 매우 우수하고,

메시지를 메모리에 저장하는 기존 메시징 시스템과는 달리 파일에 저장을 하는데 그로 인해 카프카를 재시작해도 메시지 유실 우려가 감소된다.

기본 메시징 시스템(rabbitMQ, ActiveMQ)에서는 브로커(Broker)가 컨슈머(consumer)에게 메시지를 push해 주는 방식인데, 카프카는 컨슈머(Consumer)가 브로커(Broker)로부터 메시지를 직접 가져가는 PULL 방식으로 동작하기 때문에 컨슈머는 자신의 처리 능력만큼의 메시지만 가져와 최적의 성능을 낼 수 있다. 대용량처리에 특화 되었다는 것은 아마도 이러한 구조로 설계가 되어 가능하게 된게 아닌가 싶다.


여기서 한가지 의문이 든다. 일반적으로 파일보다 메모리가 성능이 우수한데 왜 카프카가 성능이 좋은 것일까? 그 이유는 카프카의 파일 시스템을 활용한 고성능 디자인에 있다. 일반적으로 하드디스크는 메모리보다 수백배 느리지만 하드디스크의 순차적 읽기에 대한 성능은 메모리보다 크게 떨어지지 않는다고 한다. 


컨슈머(Consumer)와 브로커(Broker)에 대해서는 카프카 구성요소에 대한 설명에서 좀 더 자세히 알아보자.


카프카의 구성요소 

카프카에는 다음과 같이 여러 구성요소가 있다.

topic, partition, offset

producer, consumer, consumer group

broker, zookeeper

replication

구성요소 하나씩 살펴보도록 하자.


Topic, Partition : 카프카에 저장되는 메시지는 topic으로 분류되고, topic은 여러개의 patition으로 나눠질수 있다. partition안에는 message의 상대적 위치를 내타내는 offset이 있는데 이 offet정보를 이용해 이전에 가져간 메시지의 위치 정보를 알 수 있고 동시에 들어오는 많은 데이터를 여러개의 파티션에 나누어 저장하기 때문에 병렬로 빠르게 처리할 수 있다.


Producer, Consumer : 말대로 Producer는 생산(메시지를 Write)하는 주체, Consumer는 소비(메시지를 Read)하는 주체이다. Producer와 Consumer간에는 상호 존재 여부를 알지 못한채 자신에게 주어진 역할만 처리 하게 된다. (위 그림에서 보면 Writes가 Producer)


Consumer Group : Producer에서 생산(Write)한 메시지는 여러개의 파티션에 저장을 하는데, 그렇다면 소비하는(Consumer)하는 쪽에서도 여러 소비자가 메시지를 읽어가는것이 훨씬 효율적일 것이다. 하나의 목표를 위해 소비를 하는 그룹, 즉 하나의 토픽을 읽어가기 위한 Counsumer들을 Consumer Group라고 한다.


하지만 이 Consumer Group에는 한가지 룰이 있다. Topic의 파티션은 그 Consumer Group과 1:n 매칭. 즉, 자신이 읽고 있는 파티션에는 같은 그룹내 다른 컨슈머가 읽을 수 없다. (파티션에는 동일한 Consumer Group을 위한 하나의 구멍이 있고, Consumer는 그 구멍에 빨대를 꽂아 읽어간다고 생각하면 쉽게 상상이 될지도….^^;)


위 그림과 같이 하나의 토픽에 4개의 파티션이 있고 컨슈머그룹내 3개의 컨슈머가 있다면 컨슈머1,2,3은 각 파티션 1,2,3에 순차적으로 배치가 될 것이고, offset정보를 이용해 순차적으로 데이터를 읽게 된다. 문제는 파티션4인데 컨슈머 갯수가 파티션 갯수보다 작다면 컨슈머 1,2,3중 하나가 파티션4에 접근하여 데이터를 읽게 된다. 만약 파티션 갯수와 컨슈머 갯수가 동일하개 4개씩이라면 각 컨슈머들은 하나의 파티션에서 데이터를 읽게 될것 이고, 파티션갯수가 4개고 컨슈머 갯수가 5개이면 컨슈머5는 그냥 아무일도 안하게 된다.(일반적으로 파티션 갯수와 컨슈머 갯수는 동일하게 구성하는 것을 추천한다고 함.)


컨슈머그룹이 존재하는 또 다른 이유가 있다. 물론 이러한 구조로 데이터를 병렬로 읽게 되어 빠른처리가 가능하다는 부분도 있겠지만, 특정 컨슈머에 문제가 생겼을 경우 다른 그룹내 컨슈머가 대신 읽을 수 있게 리벨런싱이 되어 장애 상황에서도 문제 없이 대처할 수 있게 된다.


Broker, Zookeeper : broker는 카프카 서버를 칭한다. 동일한 노드내에서 여러개의 broker서버를 띄울 수 있고, Zookeeper는 이러한 분산 메시지큐의 정보를 관리해주는 역할을 한다. 카프카를 띄우기 위해서는 반드시 주키퍼가 실행되어야 한다.


Replication : 카프카에서는 replication 수를 임의로 지정하여 topic를 만들 수 있다. replication-factor에 지정하는데 만약 3으로 하면 replication 수가 3이 된다.


Kafka Cluster에 3개의 broker가 있고 3개의 Topic이 있다고 가정해보자.

Topic-1은 replication-factor 1, Topic-2은 replication-factor 2, Topic-3은 replication-factor 3인 경우이다.


그렇다면 replication은 왜 필요할까? 단지 데이터의 복제 용도라기 보다는 특정 borker에 문제가 생겼을 경우 해당 broker의 역할을 다른 broker에서 즉각적으로 대신 수행 할 수 있게 하기 위한 용도 일 것이다.


Replication - leader & follower


replication을 좀더 자세히 들여다보면, 복제요소중 대표인 leader, 그외 요소인 follower로 나누어진다. topic으로 통하는 모든 데이터의 read/write는 오직 leader에서 이루어지고 follower는 leader와 sync를 유지함으로써 leader에 문제가 생겼을 경우 follower들 중 하나가 leader역할을 하게 되는 것이다.


만약 카프카 클러스터내 broker 2에서 장애가 발생되었다면, broker 2에 있던 Topic-2(leader)의 역할을 대신 수행하기 위해 아래 그림과 같이 broker 1에 있는 Topic(follower)가 leader역할을 하게 될 것이다.


복제된 데이터가 follower들에게 있으니, 메시지의 유실이 없다는 장점이 있지만, 복제를 하기 위한 시간과 네트워크 비용이 들기 때문에 데이터의 중요도에 따라 ack옵션으로 성능과 데이터의 중요도에 따라 다음과 같이 세부설정이 가능하다.


ack (default:1)

0 : 프로듀서는 서버로부터 어떠한 ack도 기다리지 않음. 유실율 높으나 높은 처리량

1 : 리더는 데이터를 기록, 모든 팔로워는 확인하지 않음

-1(또는 all) : 모든 ISR 확인. 무손실


ack값을 설정하여 데이터의 무손실에 더 중요성을 둘 것인지 또는 유실을 어느정도 감수 하더라고 속도에 중요성을 둘 것인지를 셋팅할 수 있다.


지금까지 설명한 모든 구성요소를 그림으로 표현하면 아래 그림과 같다.


Producer에서는 메시지를 입력하고, Consumer에서 메시지를 읽어갈때 Zookeeper에서 broker 및 offset정보를 관리하기 때문에 분산처리가 가능하게 된다.


카프카를 운영하기에 앞서 기본적인 구성요소나 매커니즘에 대해 충분히 이해를 하면 운영 하는데 많은 도움이 될 것이다.


참고자료


Posted by @위너스

댓글을 달아 주세요


사내 시스템의 임시버퍼 용도로 Redis를 도입하여 성능 테스트를 진행하던 중, 버퍼 용량 이슈로 인하여 Redis와 Kafka를 비교하게 되었다. 

우선 결론적으로는 Redis(List)와 Kafka 사용 시 성능 차이는 거의 없었고, Kafka가 보관 용량에 대해선 유리하였다.


이번 주제에서는 Kafka 성능에 대한 정리이므로 Redis & Kafka 비교는 나중에 진행하기로 한다. 

궁금증

Kafka는 대용량 메시지 처리 성능이 좋다고 조금만 리서치 해보면 알 수 있다. 그러면.... 

처리량이 얼마나 될까? 

HDD와 SSD는 차이가 발생할까? 

Producer와 Consumer 수에 따라서 성능 차이가 날까?  

여러 Consumer Group이 같은 토픽을 조회해도 성능 차이가 없다고 하던데 과연 그럴까?

이러한 궁금증이 생겼다.


가장 궁금한 부분은 HDD와 SSD 성능 차이였다. 

어떤 블로그에서는 HDD보다 SSD가 최대 2.7배 성능이 높다고 하고, 어떤 학술 논문집에서는 차이가 거의 없다고 한다.

이번 성능 테스트를 진행하면서 알아보려 한다.


테스트 케이스

  • 하드웨어 스펙 차이에 따른 성능 비교 (CPU, Memory, SSD, HDD)
  • 브로커 증가에 따른 성능 비교
  • Producer 및 Consumer 수에 따른 성능 비교
  • Consumer Group 수에 따른 성능 비교

테스트 환경, 스펙, 설정

테스트 환경 구성

  • Zookpeer 3노드
  • Kafka 6노드
  • Test 장비 4노드

Zookeeper 스펙

클라우드

Azure 표준 F2s

CPU

2Core 

Memory 

4GB 

스토리지 

SSD 32GB(120 iops)

 Bandwidth

 1.5GbE

OS

CentOS Linux release 7.5.1804

Java

java(TM) SE 10.0.1

버전

kafka_2.11-2.0.0

Kafka 스펙

클라우드

Azure 표준 F4s

Azure 표준 F8s

CPU

4Core 

 8Core

Memory 

8GB 

16GB 

스토리지 

HDD 1024GB(500 iops) VS SSD 1024GB(5000 ipos)

Bandwidth

 10GbE

OS

CentOS Linux release 7.5.1804

Java

java(TM) SE 10.0.1

버전

kafka_2.11-2.0.0

 Heap Memory

 Xmx6G -Xms6G

 Xmx10G -Xms10G

토픽 및 파티션 구성

하나의 물리적 노드 당 하나의 파티션으로 정의한다. 

예를 들어 5개의 노드를 사용하는 토픽은 5개의 파티션을 가지고 있다. 

추가로 정확한 throughput을 측정하기 위해 복제는 사용하지 않았다. 


테스트 툴 선택

Kafka는 자체적으로 테스트 툴을 제공하고 있다.

응답시간을 지원하지 않으므로 테스트 평가 기준은 초당 처리량(MB/sec)으로 정의한다. 

아래와 같이 producer, consumer 별로 테스트를 진행하였다. 

 구분

테스트 명령어 

설명 

producer 

 ./kafka_2.11-2.0.0/bin/kafka-producer-perf-test.sh \

--topic test \

--record-size 1000 \

--num-records 20000000 \

--producer-props \

bootstrap.servers=10.10.20.14:9092...... \

--throughput 1000000

topic : 토픽이름

record-size : 테스트 레코드 사이즈(byte) 

num-records : 생성할 메시지수

producer-prop : 프로듀서 옵션 처리

bootstrap.servers : 브로커 리스트

throughput : 최대 메시지 처리량 제한(초당 몇개의 메시지)

 consumer

 ./kafka_2.11-2.0.0/bin/kafka-consumer-perf-test.sh \

--topic test \

--show-detailed-stats \

--group test_group \

--broker-list 10.10.20.14:9092...... \

--reporting-interval 500 \

--messages 1000000

topic : 토픽이름

messages : 소비할 메시지 개수

show-detailed-stats : 통계 지표 리포트

group : consumer group 

broker-list : 브로커 리스트

reporting-interval : 리포팅 간격

kafka 설정

6노드 모두 아래와 같이 동일하게 설정하였다.


...

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600


num.partitions=1 #기본 파티션 개수

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1 #기본 복제 개수

transaction.state.log.replication.factor=1 

transaction.state.log.min.isr=1


log.retention.hours=168 #토픽 보관시간 

log.segment.bytes=1073741824 

log.retention.check.interval.ms=300000




테스트 결과

하드웨어 스펙 차이에 따른 성능 비교

Producer 테스트 (Test 토픽 6 파티션 기준)

  • 1개의 Producer일 경우 스토리지(SSD, HDD) 및 하드웨어 스펙(F4s, F8s) 상관없이 동일한 처리량을 보여준다.
  • F4s 스펙에서는 n개의 Producer가 늘어나면,  처리량은 1/n로 나눠진다. (전체 처리량 같음)
  • F8s 스펙에서는 n개의 Producer가 늘어나더라도 처리량은 1/n 나뉘지 않고 더 높은 성능을 보여준다. (전체 처리량 많아짐)

Comsumer 테스트 (Test 토픽 4 파티션 기준)

  • 1개의 Consumer일 경우 SSD가 HDD보다 약 2.4배 빠르다.
  • F4s , F8s 스펙에서는 성능 차이가 크지 않았다.
  • n개의 Consumer가 늘어나면, SSD 환경 처리량은 1/n으로 나뉘지 않고 더 높은 성능을 보여준다. (전체 처리량 많아짐)
  • n개의 Consumer가 늘어날 경우 HDD 환경 처리량은 1/n으로 나눠진다. (전체 처리량 같음)

하드웨어 사용률

1개의 Producer, Consumer 기준으로 하드웨어 사용률을 확인해 보았다.

Producer는 사용률이 높고, Consumer는 낮다.

 구분

하드웨어 사용률 (HTOP) 

Producer 

 



Consumer

 



결론

 구분

결과 

producer 

Producer가 늘어날수록 하드웨어 사용량이 많아 처리량은 줄어든다. 하드웨어 스펙을 올리면 더 높은 성능을 보여준다.

SSD vs HDD 비교에서 하드웨어 스펙이 낮으면 차이가 별로 없고, 높으면 차이가 난다.(측정 오류 발생 소지)

 consumer

Consumer가 늘어날수록 전체 처리량의 차이는 SSD 많아지고 HDD는 거의 없다..

SSD와 HDD의 처리량은 약2.3배 차이가 난다.

SSD에 비해서 성능 차이가 있을 뿐이지 HDD도 성능이 엄청 좋다. 
HDD 기준 전체처리량은 Producer 초당 10~12만 메시지(1kb)를 적재, Consumer는 초당 22~24만 메시지를 소비한다. 

브로커 수에 따른 성능 비교

테스트 환경은 Azure F4s, SSD에서 진행하고, 브로커가 1 증가하면 파티션도 1 증가한다. 
예를 들어 4 브로커면 테스트 토픽의 4개의 파티션이라고 생각하면 된다. 

Producer 테스트

  • 브로커가 늘어나면 Producer 처리량도 같이 증가
  • 4 브로커 이상에서는 처리량 변화가 없음

Consumer 테스트 (같은 Consumer Group 기준)

  • 브로커가 증가하면 Consumer 처리량도 같이 증가

브로커 수에 따른 처리량 그래프

브로커 수에 따른 결론

브로커가 늘어나면 Producer나 Consumer 모두 처리량이 같이 증가한다. 


브로커 1노드 기준 최대  처리량

스토리지 타입

 1 Producer

 1 Consumer

 hdd

 쓰기 45~52MB/sec

(55649 records/sec)

 읽기 250~260MB/sec

(262208 records/sec)

 ssd

 쓰기 100~110MB/sec

(117094 records/sec)

 읽기 250~260MB/sec

(264654 records/sec)

여러 브로커 기준 최대  처리량

스토리지 타입

 1 Producer

 1 Consumer

 hdd

 쓰기 190~200MB/sec

(206745 records/sec)

 읽기 600~700MB/sec

 ssd

 쓰기 195~200MB/sec

(206582 records/sec)

 읽기 600~700MB/sec




Producer 및 Consumer 수에 따른 성능 비교

이전 테스트의 결과를 보면 확인할 수 있다. 

Producer 및 Consumer 수에 따른 결론

하드웨어 스펙이 좋으면 Producer, Consumer 수가 증가할수록 처리량은 늘어난다. 


Consumer Group 수에 따른 성능 비교

2 broker 기준 (Test 토픽 2파티션)

Consumer Group 수에 따른 결론

컨슈머 그룹이 늘어나도 성능 저하가 없다.




결론

브로커 1노드 기준으로 ssd hdd보다 처리량이 2 높으나 hdd Raid 묶으면 ssd 비슷한 성능이 가능하다. (비용 측면 검토 필요)
Producer와 Consumer를 같이 사용하기 때문에 처리량에 따른 하드웨어 스펙을 검토되어야  한다. 

프로덕션 환경 권장사양






Posted by 사용자 피랑이

댓글을 달아 주세요

  1. AndersonChoi 2019.08.30 16:44 신고  댓글주소  수정/삭제  댓글쓰기

    직접 node를 띄워서 테스트하시느라고 비용(시간, 돈)이 많이 들었을거 같아요.
    소중한 정보 공유주셔서 감사합니다.

  2. Lucsa Lee 2020.05.26 11:41  댓글주소  수정/삭제  댓글쓰기

    고생 많이 하셨습니다. Kafka 공부하는데 큰 도움이 되었습니다. 감사합니다.