카프카(Kafka)의 이해

카프카(Kafka) 설치 및 클러스터 구성

[카프카(Kafka) 어플리케이션 제작 ] #1. 프로듀서

[카프카(Kafka) 어플리케이션 제작 ] #2. 컨슈머


이전 글에서는 프로듀서 내부 동작 확인 및 어플리케이션을 제작하였다. 

이번에는 컨슈머 어플리케이션을 제작해본다. 

컨슈머 

카프카 컨슈머 내부 동작 및 컨슈머 어플리케이션에서 메시지 소비하는 과정을 알아보자.

컨슈머 내부 동작

컨슈머의 전체적인 내부 동작을 이해하면 컨슈머 어플리케이션을 디버깅할 때 도움이 많이 되며, 올바른 결정을 하도록 도와준다.

카프카 컨슈머의 역활

 토픽 구독

  • 컨슈머 동작의 시작은 토픽의 구독임 

 오프셋 위치

  • 카프카는 다른 큐와는 다르게 메시지 오프셋을 저장 안함
  • 오프셋은 각자의 컨슈머들이 유지해야함(컨슈머 API를 사용)

 재연/되감기/메시지 스킵

  • 상황에 따라 커스텀하게 오프셋을 변경할 수 있음
  • 재연/되감기/메시지 스킵 가능
 하트비트
  • 컨슈머의 지정된 파티션의 멤버쉽과 소유권을 확인하기 위해 하트비트를 이용하여 주기적으로  체크
  • 하트비트를 수신하지 못하면 컨슈머 그룹의 컨슈머 재할당(리벨런싱)이 일어난다. 

 오프셋 커밋

  •  리벨런싱이 일어날 때 이미 읽은 오프셋을 다시 읽을 수 있으므로 오프셋 커밋을 함

 역직렬화

  •  프로듀서에서는 카프카에 메시지를 보낼 때 어떤 직렬화를 했는지 명시하고 컨슈머에서는 역직렬화를 명시함 (프로듀서 글의  key.serializer 참고)


메시지 수신(Subscribe)을 위한 동작 순서

1. 토픽 구독 ↓
2. 카프카 서버 조사(폴 루프) : 서버 조정, 파티션 리벨런스, 컨슈머 하트피트 체크 등↓
3. 새로운 레코드가 있는 지 체크하고 있으면 가져옴↓
4. 역직렬화 ↓
5. 데이터 유효성 검증 ↓
6. 다른 시스템에 이관 및 저장 

컨슈머 어플리케이션 

프로듀서 API와 마찬가지로 컨슈머도 풍부한 API 세트를 제공한다. 

컨슈머 기본 설정 및 코드

 bootstrap.servers

  • 브로커 주소 목록
  • hostname:port 형식
  • 프로듀서 설정과 동일

 key.deserializer

  • 프로듀서에서 key.serializer와 동일한 클래스로 deserializer 한다.
  • 다른 클래스로 deserializer 하면 예외 발생
  • ByteArraySerializer, StringSerializer, IntegerSerializer 클래스 선택

 value.deserializer

  • key.deserializer와 비슷하며 메시지 값 정보에 대한 역직렬화를 명시

 group.id

  • 컨슈머 그룹 정의
  • 이전 버전에서는 필수가 아니었지만 최근 버전에서는 필수로 변경 사용하지 않으면 아래와 같은 에러 발생(Attempt to join group failed due to fatal error: The configured groupId is invalid)
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers","192.168.100.173:9092");
        producerProps.put("group.id", "hirang_test02");
        producerProps.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        producerProps.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        return producerProps;
        KafkaConsumer consumer = new KafkaConsumer(setConsumerProperty());

구독

컨슈머는 메시지 데이터를 받기 위해 토픽을 구독한다. (KafkaConsumer.subscribe 사용)

구독 메소드는 입력 파라미터에 따라 다양한 구독이 가능하다. 

subscribe(Collection<String> topics)

: 원하는 토픽 목록을 전달 / 기본 리밸런서 사용

subscribe(Pattern pattern, ConsumerRebalanceListener listener)

: 원하는 토픽을 찾기 위해 정규식을 전달 / 정규식에 대응하는 새로운 토픽의 추가가 삭제 발생시 리밸런서 트리거됨

subscribe(Collection<String> topics, ConsumerRebalanceListener listener)

: 토픽의 목록을 전달 / 리밸런서 트리거됨


오프셋 커밋 처리

오프셋에 대한 커밋이 일어나는 방법은 3가지가 존재하며 각각 특성이 존재한다. 

 자동커밋

  •  컨슈머의 기본 설정값 
  • enable.auto.commit=true 
  • auto.commit.interval.ms=1000 이 값을 길게 잡으면 장애 발생시 중복 읽기가 발생할 수 있음 
  • 예) 커밋 주기가 10초인데 7초가 지난 후 컨슈머 장애가 발생하면 이전 10초 전에 커밋한 오프셋을 가져오므로 중복 발생

 현재 오프셋 커밋

  • 상황에 따라 필요할 때 커밋 제어시 사용
  • enable.auto.commit=false
  • commitSync() 메서드를 사용해 커밋할 컨슈머 오프셋을 호출한다. 
  • ConsumerRecord의 인스턴스를 처리 후 사용 권장하며 그렇게 안하면 컨슈머 장애기 발생할 경우 레코드 손실 발생

 비동기 커밋

  •  동기 방식의 커밋은 Ack 수신이 없는 경우, 컨슈머는 대기 상태가 되므로 결과적으로 처리 속도가 좋지 못하다. 
  • 비동기도 메시지 중복은 발생할 수 있다. 
  • 예) 메시지 오프셋 10을 오프셋 5 이전에 커밋 했다면, 카프카는 5부터 10까지 다시 읽으므로 중복 발생


추가 설정

설정 작업 이전에 데이터를 처리하기 위해 컨슈머가 얼마의 시간이 필요한지 확인 및 테스트가 필요하다.

 enable.auto.commit

  • true/false
  • 오프셋 자동 커밋

 fetch.min.bytes

  • 데이터 읽기 요청에 대한 카프카 서버의 회신을 요구하는 데이터 바이트의 최소 크기 

 request.timeout.ms

  • 응답을 기다리는 최대 시간 

 auto.offset.reset

  • 유효한 오프셋이 없을 때  자동처리됨

 lastest : 파티션에서 가장 최근 메시지부터 시작

 earliest : 파티션 맨 처음부터 시작

 none : 예외가 발생됨

 session.timeout.ms

  • "컨슈머 동작 중이다"라고 알리기 위한 하트비트 전송 주기
  • 리밸런서 트리거 하는걸 막기 위함

 max.partition.fetch.bytes

  • 파티션마다 할당할 최대 데이터 크기
  • ConsumerRecord 객체에 대해 컨슈머가 필요로 하는 메모리는 파티션수 * 설정값보다 커야함
  • 예) 파티션 10개 1개의 컨슈머이고, max.partiton.fetch.bytes가 2MB면 10 * 2MB = 20MB가 메모리로 잡혀 있어야함


컨슈머 전체 코드
package com.example.kafkatest;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;

@SpringBootApplication
public class KafkaConsumerApplication {
    private static final Logger log = LoggerFactory.getLogger(KafkaTestApplication.class);
    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
        subscribe();
    }

    private static Properties setConsumerProperty(){
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers","192.168.100.173:9092");
        producerProps.put("group.id", "hirang_test02");
        producerProps.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        producerProps.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        producerProps.put("enable.auto.commit", "true");
        producerProps.put("auto.commit.interval.ms", "1000");
        producerProps.put("session.timeout.ms", "30000");
        return producerProps;
    }

    private static void subscribe(){
        String topic = "hirang";
        List topicList = new ArrayList<>();
        topicList.add(topic);
        KafkaConsumer consumer = new KafkaConsumer(setConsumerProperty());
        consumer.subscribe(topicList);
        log.info("Subscribed to topic :\t" + KafkaStatic.DEFAULT_TOPIC);
        int i = 0;
        try {
            while (true) {
                ConsumerRecords records = consumer.poll(500);
                for (ConsumerRecord record : records)
                    log.info("offset = " + record.offset() + "\tkey =" + record.key() + "\tvalue =" + record.value());

                //TODO : Do processing for data here
                consumer.commitAsync(new OffsetCommitCallback() {
                    public void onComplete(Map map, Exception e) {

                    }
                });
            }
        } catch (Exception ex) {
            //TODO : Log Exception Here
        } finally {
            try {
                consumer.commitSync();

            } finally {
                consumer.close();
            }
        }

    }
}

컨슈머 콘솔 로그

읽은 메시지 데이터를 로그로 확인하였다. 

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.3.RELEASE)

2019-03-08 11:30:52.369  INFO 4556 --- [           main] c.e.kafkatest.KafkaConsumerApplication   : Starting KafkaConsumerApplication on ts-hirang with PID 4556 (D:\workspace\github\kafka-test\target\classes started by DESKTOP in D:\workspace\github\kafka-test)
2019-03-08 11:30:52.379  INFO 4556 --- [           main] c.e.kafkatest.KafkaConsumerApplication   : No active profile set, falling back to default profiles: default
2019-03-08 11:30:53.178  INFO 4556 --- [           main] c.e.kafkatest.KafkaConsumerApplication   : Started KafkaConsumerApplication in 1.766 seconds (JVM running for 3.564)
2019-03-08 11:30:53.192  INFO 4556 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	auto.commit.interval.ms = 1000
	auto.offset.reset = latest
	bootstrap.servers = [192.168.100.173:9092]
	check.crcs = true
	client.id = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = hirang_test02
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 30000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2019-03-08 11:30:53.238  INFO 4556 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2019-03-08 11:30:53.239  INFO 4556 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fa14705e51bd2ce5
2019-03-08 11:30:53.240  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : Subscribed to topic :	hirang
2019-03-08 11:30:53.318  INFO 4556 --- [           main] org.apache.kafka.clients.Metadata        : Cluster ID: 6ejl7iMZQASnPqRCdv13wA
2019-03-08 11:30:53.319  INFO 4556 --- [           main] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=hirang_test02] Discovered group coordinator 192.168.100.173:9092 (id: 2147483646 rack: null)
2019-03-08 11:30:53.321  INFO 4556 --- [           main] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=hirang_test02] Revoking previously assigned partitions []
2019-03-08 11:30:53.321  INFO 4556 --- [           main] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=hirang_test02] (Re-)joining group
2019-03-08 11:30:53.347  INFO 4556 --- [           main] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=hirang_test02] Successfully joined group with generation 12
2019-03-08 11:30:53.348  INFO 4556 --- [           main] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=hirang_test02] Setting newly assigned partitions [hirang-0]
2019-03-08 11:31:03.104  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277494	key =null	value =0-----87d9bcc5-3b75-4032-9427-e02b75a9eed7
2019-03-08 11:31:03.107  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277495	key =null	value =1-----9c6e8e14-dd8e-4ddf-b810-4cf1f3c80e5c
2019-03-08 11:31:03.108  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277496	key =null	value =2-----5f8eac28-06ed-4c53-a0c0-a15721a6ede2
2019-03-08 11:31:03.108  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277497	key =null	value =3-----e6dd3658-5bc2-4bdb-ab7b-b127f8ca176a
2019-03-08 11:31:03.108  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277498	key =null	value =4-----e077a534-d95e-4aca-adb3-1c2c1a29fce1
2019-03-08 11:31:03.108  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277499	key =null	value =5-----5b38908b-ec60-43e8-98cc-76fa66451483
2019-03-08 11:31:03.112  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277500	key =null	value =6-----d049b157-9370-433e-9cee-7101918eb449
2019-03-08 11:31:03.112  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277501	key =null	value =7-----8786e6a2-c757-4a3f-be59-130d5c393bfc
2019-03-08 11:31:03.116  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277502	key =null	value =8-----30b82719-abc5-4e1e-81d1-d8e47e53cb7d
2019-03-08 11:31:03.122  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277503	key =null	value =9-----687f0bdb-3280-415f-9fb8-8d5734380755

컨슈머 모범 사례

카프카 컨슈머의 내부 동작과 토픽의 메시지 소비(Subscribe)하는 방법을 알아보았다. 
컨슈머 구성 요소를 정의할 때 좀 더 바람직한 가이드라인에 대해서 알아보자.

예외처리를 하자!
모든 어플리케이션은 예외처리... 

리벨런스 관리!
새로운 컨슈머가 컨슈머 그룹에 합류할 때마다, 혹은 예전 컨슈머가 종료되면 파티션 리밸런스가 트리거 된다. 
(컨슈머 그룹의 컨슈머 개수와 파티션이 같을 때 새로운 컨슈머 추가는 리밸런스가 안일어남)
컨슈머가 파티션 소유권을 잃을 때마다 가장 최근에 수신한 오프셋을 반드시 커밋해 줘야 한다. 

제때 오프셋 커밋하기!
메시지 오프셋에 대한 커밋은 처리 상황에 따라 잘 제때 수행해야 한다. 
그렇지 않으면 중복 및 소실이 발생할 수 있다. 
오프셋 주기에 따른 성능과 중복에 대한 문제간의 트레이드 오프가 필요한다. 
예) 장애 발생시 데이터 중복 처리에 대한 심각한 문제가 발생할 경우 오프셋을 커밋하는 시간을 가능한 짧게 한다. 

자동 오프셋 커밋!
중복 처리에 대한 문제가 없거나, 컨슈머가 자동으로 오프셋을 커밋하도록 관리하기를 바랄경우 사용한다. 
일반적으로 대부분 사용

마치며

카프카의 프로듀서 및 컨슈머에 대한 내부 동작에 대해서 이해하였고 어플리케이션도 만들어 보았다. 

프로덕션 환경에 적용할 때 컨슈머 그룹안에서 오프셋 커밋 전략,하트비트, 리벨런스 등 여러 성능 테스트가 필요하다. 

Posted by 사용자 피랑이

댓글을 달아 주세요


위의 링크 글에서는 Kafka 개요 및 설치, 명령어를 이용하여 토픽 생성, 토픽 메시지 Publish, Subscribe에 대해서 설명하였다. 


이번에는 Kafka 라이브러리를 이용하여 어플리케이션을 만들어보자.

본인은 Intellij IDE 환경에서 SpringBoot 2.1 플래폼을 기반으로 Maven 빌드를 사용한다. 

 

프로듀서나 컨슈머를 사용하기 위해서는 kafka-clients를 이용하므로 pom.xml에 종속성을 추가한다. 


    
    org.apache.kafka
    kafka-clients
    2.1.0

프로듀서

카프카 프로듀서 내부 동작 및 프로듀서 어플리케이션에서 카프카 큐로 메시지가 전달되는 과정을 알아보자.

프로듀서 내부 동작

메시지 게시(Publish)를 위한 동작 순서
1. 프로듀서 API 호출 ↓
2. Key-Value 형태로 이뤄진 Kafka 구성 정보를 전달 받음 ↓
3. 메시징 데이터를 바이트 배열로 직렬화 ↓
4. 전송할 파티션 결정(옵션) ↓
5. 메시지를 보내기 위해 브로커 결정 ↓
6. API에 쓰기 요청 ↓
7. 응답 수신 (응답이 수신되명 성공으로 간주)
   오류 코드 수신 (예외를 발생시키거나 설정에 따른 재시도 수행)

메시지 Publish와 별개로 카프카 프로듀서가 책임지고 있는 역활에 대해서 정리해본다. 

카프카 프로커 URL 부트스트랩
프로듀서는 카프카 클러스터에 대한 메타데이터를 가져오기 위해 최소 하나의 브로커에 연결한다. 
보통 다수의 브로커에 연결한다. 

데이터 직렬화
브로커에게 메시지를 보내기 전 TCP 기반 데이터 송수신을 위하여 데이터 객체를 바이트 직렬화한다. 

토픽 파티션 결정
일반적으로 메시지 데이터 객체의 키 해시를 이용하여 어떤 파티션으로 데이터가 전송되어야 하는지 결정한다. 
아니면 커스텀하게 로직을 만들어서 파티션을 결정할 수 있도록 사용자 정의 파티션 코드를 작성할 수 있다.

처리 실패/재시도 
프로듀서 API 설정을 통해 재시도 횟수나 예외 처리가 가능하다. 
예외 종류에 따른 데이터 흐름을 결정할 수 있다.

배치 처리 
배치 처리는 입출력 횟수를 줄이고 프로듀서 메모리를 최적화한다.  
보통 메세지 수를 결정하여 처리하며, 전반적인 처리 속도는 배치에서의 메시지 수에 비례해 증가한다. 

프로듀서 어플리케이션 

프로듀서 어플리케이션 작성의 경우 추상화 계층에 메서드를 노출하는 프로듀서 API를 사용한다. 


카프카 프로듀서 API 사용 순서

1. 기본 설정 정보 ↓
2. 프로듀서 객체 생성 ↓
3. 프로듀서 레코드 객체 생성 ↓
4. 사용자 정의 파티션 생성(옵션) ↓
5. 추가 설정 

기본 설정 정보

 bootstrap.servers

  • 브로커 주소 목록
  • hostname:port 형식
  • 한 개 이상 설정 가능 (브로커 접속 실패할 수 있으므로 최소 두개 지정 권장)

 key.serializer

  • 브로커는 메시지를 전달받으면 메시지 키 정보가 어떠한 직렬화가 되어 있다고 가정하므로 어떤 직렬화를 써는지 명시해야함 
  • 카프카는 ByteArraySerializer, StringSerializer, IntegerSerializer 세가지 내장된 클래스 제공

 value.serializer

  • key.serializer와 비슷하며 메시지 값 정보에 대한 직렬화를 명시


프로듀서 객체 및 프로듀서 레코드 객체 생성

토픽에 레코드를 전송하는 ProducerRecord 객체는 토픽이름, 파티션 번호, 타입스탬프, 키, 값 등을 포함한다. 

토픽이름과 메시지 데이터 값은 필수 파라미터이며, 파티션 번호나 타임스탬프, 키, 값 들은 선택 파라미터이다. 

동기메시징은 브로커에서 회신을 RecordMetadata를 보내고, 비동기 메시징은 브로커에서 즉각 응답 처리만 하며, 프로듀서에서는 콜백 인터페이스를 수행하여 처리한다.

동기메시징으로도 잠깐 테스트를 하였는데 메시지 사이즈 100byte 기준으로 1만 TPS정도 나온다. 


사용자 정의 파티션 생성

대부분의 경우 기본 파티션을 사용하는 경우 충분하나, 하나의 키에 대한 데이터의 비중이 매우 큰 경우 별도의 파티션의 할당이 필요할 수 있다. 

예) K라는 키에 전체데이터의 30%가 있는 경우 N파티션을 할당에 다른키가 N파티션에 할당되지 않게 만들어 공간이 소진되거나 속도가 느려지지 않도록 한다. 

카프카는 사용자 정의 파티션을 생성할 수 있도록 파티셔너 인터페이스를 제공한다. 

package com.example.kafkatest;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;
import java.util.Map;

public class CustomPartition implements Partitioner {
    public int partition(
            String topicName,
            Object key,
            byte[] keybBytes,
            Object value,
            byte[] valueBytes,
            Cluster cluster
    )
    {
        List partitionInfos = cluster.partitionsForTopic(topicName);
        int numPartitions = partitionInfos.size();
        //로직여기에
        return 0;
    }

    @Override
    public void close(){}

    @Override
    public void configure(Map map){}
}
추가 설정 정보

 buffer.memory

  • 카프카 브로커로 메시지 전송 전에 사용할 최대 버퍼 메모리 크기
  • 설정 한계에 도달하면 프로듀서는 예외를 발생시키기 전에 max.block.ms 시간 동안 대기
  • batch.size가 크다면 프로듀서에 더 많은 메모리를 할당해야 함
  • request.timeout.ms를 사용해 시간 초과 설정을 적용하는 좋음

 ack

  • 0, 1, all 사용
  • 0 : ack를 기다리지 않음 (fire and forget)
  • 1 : 리더가 메시지를 기록하자마자 ack를 받음
  • all : 리더가 복제하는 팔로워까지 완료된 경우 ack를 받음
  • 카프카(Kafka)의 이해 Replication - leader & follower 참조

 batch.size

  • 크기 많큼 배치 처리를 허용
  • 대량으로 보낼경우 1000이상으로 설정하는 걸 권장
  • 1~10으로 할 경우 메시지 유실이 발생하는데 정확한 부부은 확인이 필요 (유실 테스트까지만 함)

 linger.ms

  • 브로커로 전송전에 프로듀서가 추가 메시지를 기다리는 시간

 compression.type

  •  프로듀서는 압축을 사용하지 않고 메시지를 전송이 Default임
  • GZIP, Snappy, LZA 형식의 압축이 가능하며, 배치 처리가 많을수록 압축 효과가 좋다. 
  • 되도록 사용하길 권장 Logstash에서 압축과 비압축 차이가 컸다

 retres

  • 메시지 전송이 실패했을시 예외를 발생시키기전 재전송 시도 값 

 partitioner.class

  •  사용자 정의 파티션 생성 시 허용

 timeout.ms

  •  프로듀서에서 오류를 보내기 전에 팔로워의 Ack를 리더가 기다리는 시간


프로듀서 전체 코드

package com.example.kafkatest;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Future;

@SpringBootApplication
public class KafkaProducerApplication {
    private static final Logger log = LoggerFactory.getLogger(KafkaTestApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerApplication.class, args);
        publish();
    }

    private static Properties setProducerProperty(){
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers","192.168.100.173:9092");
        producerProps.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("ack","all");
        producerProps.put("retries", 1);
        /*
            배치사이즈가 작으면 대량전송시 유실됨 최소 1000이상 잡는게 효율적
        */
        producerProps.put("batch.size", 1);
        producerProps.put("linger.ms", 1);
        producerProps.put("buffer.memory", 24568545);
        return producerProps;
    }

    private static void publish(){
        KafkaProducer producer = new KafkaProducer(setProducerProperty());
        log.info("Pubulish to topic :\t" + KafkaStatic.DEFAULT_TOPIC);
        log.info("Pubulish Start");
        for (int i=0; i<10; i++  ) {
            ProducerRecord data = new ProducerRecord(KafkaStatic.DEFAULT_TOPIC, i+"-----"+UUID.randomUUID().toString());
            Future recordMetadata = producer.send(data);
        }
        producer.close();
        log.info("Pubulish End");
    }
}
package com.example.kafkatest;

public class KafkaStatic {
    public static final String DEFAULT_TOPIC = "hirang";
}

프로듀서 콘솔 로그

어플리케이션을 구동하면 아래와 같이 Kafka 설정 관련 정보를 볼 수 있다. 

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.3.RELEASE)

2019-03-07 11:21:18.822  INFO 376 --- [           main] c.e.kafkatest.KafkaProducerApplication   : Starting KafkaProducerApplication on ts-hirang with PID 376 (D:\workspace\github\kafka-test\target\classes started by DESKTOP in D:\workspace\github\kafka-test)
2019-03-07 11:21:18.824  INFO 376 --- [           main] c.e.kafkatest.KafkaProducerApplication   : No active profile set, falling back to default profiles: default
2019-03-07 11:21:19.398  INFO 376 --- [           main] c.e.kafkatest.KafkaProducerApplication   : Started KafkaProducerApplication in 0.795 seconds (JVM running for 1.635)
2019-03-07 11:21:19.409  INFO 376 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = 1
	batch.size = 1
	bootstrap.servers = [192.168.100.173:9092]
	buffer.memory = 24568545
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 1
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 1
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2019-03-07 11:21:19.456  WARN 376 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'ack' was supplied but isn't a known config.
2019-03-07 11:21:19.459  INFO 376 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2019-03-07 11:21:19.460  INFO 376 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fa14705e51bd2ce5
2019-03-07 11:21:19.460  INFO 376 --- [           main] c.e.kafkatest.KafkaTestApplication       : Pubulish to topic :	hirang
2019-03-07 11:21:19.461  INFO 376 --- [           main] c.e.kafkatest.KafkaTestApplication       : Pubulish Start
2019-03-07 11:21:19.526  INFO 376 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: 6ejl7iMZQASnPqRCdv13wA
2019-03-07 11:21:19.534  INFO 376 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2019-03-07 11:21:19.577  INFO 376 --- [           main] c.e.kafkatest.KafkaTestApplication       : Pubulish End

실행 결과

프로듀서 모범 사례

카프카 프로듀서의 내부 동작과 일반적인 토픽에 메시지 게시(Publish)하는 방법을 알아보았다.
프로듀서 구성 요소를 정의할 때 좀 더 바람직한 가이드라인에 대해서 알아보자.

데이터 유효성 검증을 하자!

프로듀서 어플리케이션을 제작할 때 데이터 스키마의 정합성, null이 아닌 키항목 등 유효성 검증을 간과하기 쉽다. 

이런 유효성 검증을 하지 않으면 메시지 데이터가 올바르게 파티션에 할당되지 않으므로, 브로커에 대한 부하 분산에 영향을 준다. (순서를 보장해야하는 1파티션이면 상관없을듯...)


예외처리를 하자!

모든 어플리케이션의 예외처리는 상황에 따라 무조건 필요하다. (너무 남발해서도 문제지만..)

프로듀서에서 예외 상황에 따라 적절한 흐름 제어를 해야한다.

예) 게임 로그 데이터의 임시 큐로 Kafka를 사용할 경우 아이템 거래 사기 발생 시 계정이나 기사단 블럭 등 예외 관련 흐름 제어....


재시도 횟수!

카프카 클러스터 연결 타임아웃 예외 발생 시 메시지 손실이 날 수 있으니 재시도 횟수를 설정한다. 


부트스트랩 URL 수!

카프카 브로커 설정은 반드시 두 개 이상으로 설정한다. 

프로덕션 환경에서 최소 카프카 클러스터 구성은 카프카 3노드, 주키퍼 3노드를 사용하기 때문에 세 개를 적어주면 된다. 


잘못된 파티셔닝 방식의 사용 방지!

사용자 정의 파티션을 잘못 사용하게 되면 토픽의 모든 파티션으로 균일하게 분배되지 않을 수 있고, 최적화된 병렬처리가 불가능 하도록 만들 수 있다.  사용자 정의 파티션 사용시 라운드 로빈을 기본으로 넣자.


메시지의 임시 보존!

카프카 클러스터 장애로 인하여 재시도 횟수를 넘었을 시 메시지 소실이 발생할 수 있으므로 메시지를 다시 전송할 있도록 디스크나, 데이터베이스를 이용하자. 

마치며

카프카 프로듀서 내부 동작을 이해하고 간단하게 어플리케이션을 만들어 보았다. 
다음에는 컨슈머 어플리케이션에 대해 만들어본다.


Posted by 사용자 피랑이

댓글을 달아 주세요

  1. AndersonChoi 2019.08.30 16:41 신고  댓글주소  수정/삭제  댓글쓰기

    감사합니다. 잘 정리해주셨네요. 많은도움되었습니다.
    특히 추가 설정 정보설명이 유용했습니다!

  2. 셈프 2021.01.19 22:54  댓글주소  수정/삭제  댓글쓰기

    max.block.ms 너무 짧게 잡으면 프로듀싱 시 대기 시간 때문에 메모리 이슈가 있을수 있습니다. 30초에서 60초(60000) 정도 필요합니다.
    1초로 잡았다가 초당 2~30만건 카프카에 적재시 에러가 발생하네요

Kafka를 설치하고 간단하게 클러스터 환경을 구성하여 Producer 및 Consumer 테스트를 해보자.


먼저 가상머신을 이용해 다음 그림과 같이 클러스터 환경을 구성하여 테스트를 진행할 것이다.

이 글에서는 3대의 서버를 생성하였지만 1대의 서버로 포트를 다르게 하여 구성할 수도 있다. 또한 Kafka와 Zookeeper서버를 동일한 장비에 구축하였지만 실무에서는 별도로 구축하는 것이 좋다.


Kafka 다운로드 및 설치

다운로드(각 버전에 대해서는 https://kafka.apache.org/downloads를 참고하자.)

$ wget http://apache.mirror.cdnetworks.com/kafka/2.1.0/kafka_2.11-2.1.0.tgz

압축해제 및 경로이동

$ tar -zxvf kafka_2.11-2.1.0.tgz
$ cd kafka_2.11-2.1.0

Kafka의 동작은 Zookeeper에 의해 관리가 되기 때문에 Zookeeper 없이는 Kafka를 구동할 수 없다. 이 때문에 Kafka를 다운로드 하면 Zookeeper도 함께 들어있다. 물론 별도로 최신버전의 Zookeeper를 다운받아 사용해도 되지만, Kafka에 들어있는 Zookeeper는 Kafka버전과 잘 동작하는 검증된 버전이므로 패키지 안에 있는 Zookeeper의 사용을 권장한다.


Zookeeper 설정

각 인스턴스에 설치된 Kafka의 config/zookeeper.properties 파일은 하나의 Zookeeper를 실행하는데 쓰이는 설정 파일이다. 이 말은 zookeeper1.properties, zookeeper2.properties, zookeeper3.properties 이런식으로 여러개의 설정파일을 만들고 하나의 장비에서 다중으로 실행할 수 있다는 의미이다. 설정파일을 다음과 같이 3대의 서버에 동일하게 추가하자.


$ vi config/zookeeper.properties

# the directory where the snapshot is stored.

dataDir=/tmp/zookeeper

# the port at which the clients will connect

clientPort=2181

# disable the per-ip limit on the number of connections since this is a non-production config

maxClientCnxns=0


initLimit=5

syncLimmit=2


server.1=192.168.137.101:2888:3888

server.2=192.168.137.102:2888:3888

server.3=192.168.137.103:2888:3888

새로 추가한 설정값은 클러스터를 구성하는데 필요한 설정 값들안데 여기서 주의할 점은 모든 Zookeeper 서버들은 동일한 변수 값을 가지고 있어야 한다.


initLimit

팔로워가 리더와 초기에 연결하는 시간에 대한 타임아웃

syncLimit

팔로워가 리더와 동기화 하는데에 대한 타임아웃. 즉 이 틱 시간안에 팔로워가 리더와 동기화가 되지 않는다면 제거 된다.


이 두값은 dafault 기본값이 없기 때문에 반드시 설정해야 하는 값이다.


그리고 server.1,2,3의 숫자는 인스턴스 ID이다. ID는 dataDir=/tmp/zookeeper 폴더에 myid파일에 명시가 되어야 한다.

/tmp/zookeeper 디렉토리가 없다면 생성하고 myid 파일을 생성하여 각각 서버의 고유 ID값을 부여해야 한다. 

(그 외 자세한 설정정보를 알고 싶다면 키퍼 가이드문서를 참고하자.)


1 서버 (192.168.137.101)

$ mkdir /tmp/zookeeper
$ echo 1 > /tmp/zookeeper/myid

2 서버 (192.168.137.102)

$ mkdir /tmp/zookeeper
$ echo 2 > /tmp/zookeeper/myid

3 서버 (192.168.137.103)

$ mkdir /tmp/zookeeper $ echo 3 > /tmp/zookeeper/myid

이제 Zookeeper를 구동하기 위한 설정은 끝~


Kafka 설정

Kafka의 config/server.properties 파일은 하나의 Kafka를 실행하는데 쓰이는 설정 파일이다. Zookeeper와 마찬가지로 여러개의 설정파일을 만들고 다중 실행을 할 수 있다.

설정파일 config/server.properties에 3대 서버 각 환경에 맞는 정보를 입력해 준다.


$ vi config/server.properties

1 서버 (192.168.137.101)

broker.id=1

listeners=PLAINTEXT://:9092

advertised.listeners=PLAINTEXT://192.168.137.101:9092

zookeeper.connect=192.168.137.101:2181, 192.168.137.102:2181, 192.168.137.103:2181


2 서버 (192.168.137.102)

broker.id=2

listeners=PLAINTEXT://:9092

advertised.listeners=PLAINTEXT://192.168.137.102:9092

zookeeper.connect=192.168.137.101:2181, 192.168.137.102:2181, 192.168.137.103:2181


3 서버 (192.168.137.103)

broker.id=3

listeners=PLAINTEXT://:9092

advertised.listeners=PLAINTEXT://192.168.137.103:9092

zookeeper.connect=192.168.137.101:2181, 192.168.137.102:2181, 192.168.137.103:2181

별도 파일(/tmp/zookeeper/myid)에 인스턴스 ID를 명시해야 하는 Zookeeper와는 달리 Kafka는 설정파일안에 broker.id라는 항목이 있다.

그리고 zookeeper.connet라는 항목에는 Zookeeper인스턴스들의 정보를 입력해준다.


Kafka를 구동하기 위한 설정은 끝났다. 클러스터 구성을 위한 인스턴스들의 정보를 입력해 주는것이 거의 대부분의 설정이다.

그외 설정 파일에 대한 상세한 내용은 공식 홈페이지의 Broker Configs를 참고하길 바란다. default설정값을 확인하고 변경하고자 하는 값들은 설정파일에 명시를 해주면 된다.


Zookeeper 및 Kafka 서버 구동

Kafka를 구동하기 위해 먼저 Zookeeper를 구동 한다음 이후 Kafka를 구동해야 한다.


$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties

3대의 서버에 Zookeeper와 Kafka가 정상적으로 구동이 되었다면 다음과 같이 starting 메시지를 확인할 수 있을 것이다.


이제 기본적인 클러스터 환경을 구성하고 서버를 구동시켰다.


Kafka에서는 bin폴더 아래 제공되는 스크립트 파일을 이용해 Topic을 관리하고 Producer,Consumer를 테스트 해볼수 있다.


기본적인 몇가지 스크립트 명령을 이용해 Topic을 생성해 보고 Producer 메시지를 저장하고 Consumer 메시지를 읽어오는것을 확인해 보도록 하자.


Topic 관리


1.Topic생성

GameLog, GameLog2, GameLog3 세개의 Topic을 생성해보자.(replication-factor:3, partitions : 1)


$ bin/kafka-topics.sh --create --zookeeper 192.168.137.101:2181, 192.168.137.102:2181, 192.168.137.103:2181 --replication-factor 3 --partitions 1 --topic GameLog
$ bin/kafka-topics.sh --create --zookeeper 192.168.137.101:2181, 192.168.137.102:2181, 192.168.137.103:2181 --replication-factor 3 --partitions 1 --topic GameLog2
$ bin/kafka-topics.sh --create --zookeeper 192.168.137.101:2181, 192.168.137.102:2181, 192.168.137.103:2181 --replication-factor 3 --partitions 1 --topic GameLog3


2.Topic 리스트 확인

$ bin/kafka-topics.sh --list --zookeeper 192.168.137.101:2181, 192.168.137.102:2181, 192.168.0.103:2181


3.Topic 삭제

$ bin/kafka-topics.sh --delete --zookeeper 192.168.137.101:2181, 192.168.137.102:2181, 192.168.137.103:2181 --topic GameLog3


4.Topic 상세 정보 확인

$ bin/kafka-topics.sh --describe --zookeeper 192.168.137.101:2181, 192.168.137.102:2181, 192.168.137.103:2181

토픽을 생성했으면 해당 토픽에 메시지를 생산하고 소비하는것을 직접 확인해보자.


메시지 생산 및 소비


1.Producer 메시지 생산하기

$ bin/kafka-console-producer.sh --broker-list 192.168.137.101:9092,192.168.137.102:9092,192.168.137.103:9092 --topic GameLog


2.Consumer 메시지 소비하기

$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.137.101:9092,192.168.137.102:9092,192.168.137.103:9092 --topic GameLog --from-beginning

from-beginning 옵션은 해당 topic의 맨 처음 메시지부터 소비하겠다는 의미이다.

그림에서 보는것과 같이 Producer에서 메시지를 입력하면 Consumer에서 해당 메시지를 읽어오는 것을 확인 할 수 있다.


지금까지 Kafka 클러스터를 직접 구축하여 메시지 생산하고 소비하는 것을 간단하게 해보았다.


Posted by @위너스

댓글을 달아 주세요

  1. 2021.01.25 11:47  댓글주소  수정/삭제  댓글쓰기

    비밀댓글입니다

  2. 2021.02.24 07:42  댓글주소  수정/삭제  댓글쓰기

    비밀댓글입니다

  3. 궁금이 2021.03.11 17:45  댓글주소  수정/삭제  댓글쓰기

    1,2 서버 (192.168.137.101)
    2 서버 (192.168.137.102)

    이렇게 이중화해서 사용하고 자 합니다. 설명 좀 부탁합니다

카프카(Kafka)의 이해

Kafka 2019. 1. 17. 11:55


대용량 게임로그 수집을 위해 Elastic Stack을 도입하게 되었는데, 중간에 버퍼역할(메시지큐)을 하는 Kafka에 대서 알아보려고 한다.


메시지큐? 

메시지 지향 미들웨어(Message Oriented Middleware: MOM)은 비동기 메시지를 사용하는 다른 응용프로그램 사이의 데이터 송수신을 의미하는데 MOM을 구현한 시스템을 메시지큐(Message Queue:MQ)라 한다.


카프카란? 

분산형 스트리밍 플랫폼(A distributed streaming platform)이다. LinkedIn에서 여러 구직 및 채용 정보들을 한곳에서 처리(발행/구독)할 수 있는 플래폼으로 개발이 시작 되었다고 한다.

(발행/구독: pub-sub은 메시지를 특정 수신자에게 직접적으로 보내주는 시스템이 아니고, 메시지를 받기를 원하는 사람이 해당 토픽(topic)을 구독함으로써 메시지를 읽어 올 수 있다.)


카프카의 특징 

대용량 실시간 로그처리에 특화되어 설계된 메시징 시스템으로 TPS가 매우 우수하고,

메시지를 메모리에 저장하는 기존 메시징 시스템과는 달리 파일에 저장을 하는데 그로 인해 카프카를 재시작해도 메시지 유실 우려가 감소된다.

기본 메시징 시스템(rabbitMQ, ActiveMQ)에서는 브로커(Broker)가 컨슈머(consumer)에게 메시지를 push해 주는 방식인데, 카프카는 컨슈머(Consumer)가 브로커(Broker)로부터 메시지를 직접 가져가는 PULL 방식으로 동작하기 때문에 컨슈머는 자신의 처리 능력만큼의 메시지만 가져와 최적의 성능을 낼 수 있다. 대용량처리에 특화 되었다는 것은 아마도 이러한 구조로 설계가 되어 가능하게 된게 아닌가 싶다.


여기서 한가지 의문이 든다. 일반적으로 파일보다 메모리가 성능이 우수한데 왜 카프카가 성능이 좋은 것일까? 그 이유는 카프카의 파일 시스템을 활용한 고성능 디자인에 있다. 일반적으로 하드디스크는 메모리보다 수백배 느리지만 하드디스크의 순차적 읽기에 대한 성능은 메모리보다 크게 떨어지지 않는다고 한다. 


컨슈머(Consumer)와 브로커(Broker)에 대해서는 카프카 구성요소에 대한 설명에서 좀 더 자세히 알아보자.


카프카의 구성요소 

카프카에는 다음과 같이 여러 구성요소가 있다.

topic, partition, offset

producer, consumer, consumer group

broker, zookeeper

replication

구성요소 하나씩 살펴보도록 하자.


Topic, Partition : 카프카에 저장되는 메시지는 topic으로 분류되고, topic은 여러개의 patition으로 나눠질수 있다. partition안에는 message의 상대적 위치를 내타내는 offset이 있는데 이 offet정보를 이용해 이전에 가져간 메시지의 위치 정보를 알 수 있고 동시에 들어오는 많은 데이터를 여러개의 파티션에 나누어 저장하기 때문에 병렬로 빠르게 처리할 수 있다.


Producer, Consumer : 말대로 Producer는 생산(메시지를 Write)하는 주체, Consumer는 소비(메시지를 Read)하는 주체이다. Producer와 Consumer간에는 상호 존재 여부를 알지 못한채 자신에게 주어진 역할만 처리 하게 된다. (위 그림에서 보면 Writes가 Producer)


Consumer Group : Producer에서 생산(Write)한 메시지는 여러개의 파티션에 저장을 하는데, 그렇다면 소비하는(Consumer)하는 쪽에서도 여러 소비자가 메시지를 읽어가는것이 훨씬 효율적일 것이다. 하나의 목표를 위해 소비를 하는 그룹, 즉 하나의 토픽을 읽어가기 위한 Counsumer들을 Consumer Group라고 한다.


하지만 이 Consumer Group에는 한가지 룰이 있다. Topic의 파티션은 그 Consumer Group과 1:n 매칭. 즉, 자신이 읽고 있는 파티션에는 같은 그룹내 다른 컨슈머가 읽을 수 없다. (파티션에는 동일한 Consumer Group을 위한 하나의 구멍이 있고, Consumer는 그 구멍에 빨대를 꽂아 읽어간다고 생각하면 쉽게 상상이 될지도….^^;)


위 그림과 같이 하나의 토픽에 4개의 파티션이 있고 컨슈머그룹내 3개의 컨슈머가 있다면 컨슈머1,2,3은 각 파티션 1,2,3에 순차적으로 배치가 될 것이고, offset정보를 이용해 순차적으로 데이터를 읽게 된다. 문제는 파티션4인데 컨슈머 갯수가 파티션 갯수보다 작다면 컨슈머 1,2,3중 하나가 파티션4에 접근하여 데이터를 읽게 된다. 만약 파티션 갯수와 컨슈머 갯수가 동일하개 4개씩이라면 각 컨슈머들은 하나의 파티션에서 데이터를 읽게 될것 이고, 파티션갯수가 4개고 컨슈머 갯수가 5개이면 컨슈머5는 그냥 아무일도 안하게 된다.(일반적으로 파티션 갯수와 컨슈머 갯수는 동일하게 구성하는 것을 추천한다고 함.)


컨슈머그룹이 존재하는 또 다른 이유가 있다. 물론 이러한 구조로 데이터를 병렬로 읽게 되어 빠른처리가 가능하다는 부분도 있겠지만, 특정 컨슈머에 문제가 생겼을 경우 다른 그룹내 컨슈머가 대신 읽을 수 있게 리벨런싱이 되어 장애 상황에서도 문제 없이 대처할 수 있게 된다.


Broker, Zookeeper : broker는 카프카 서버를 칭한다. 동일한 노드내에서 여러개의 broker서버를 띄울 수 있고, Zookeeper는 이러한 분산 메시지큐의 정보를 관리해주는 역할을 한다. 카프카를 띄우기 위해서는 반드시 주키퍼가 실행되어야 한다.


Replication : 카프카에서는 replication 수를 임의로 지정하여 topic를 만들 수 있다. replication-factor에 지정하는데 만약 3으로 하면 replication 수가 3이 된다.


Kafka Cluster에 3개의 broker가 있고 3개의 Topic이 있다고 가정해보자.

Topic-1은 replication-factor 1, Topic-2은 replication-factor 2, Topic-3은 replication-factor 3인 경우이다.


그렇다면 replication은 왜 필요할까? 단지 데이터의 복제 용도라기 보다는 특정 borker에 문제가 생겼을 경우 해당 broker의 역할을 다른 broker에서 즉각적으로 대신 수행 할 수 있게 하기 위한 용도 일 것이다.


Replication - leader & follower


replication을 좀더 자세히 들여다보면, 복제요소중 대표인 leader, 그외 요소인 follower로 나누어진다. topic으로 통하는 모든 데이터의 read/write는 오직 leader에서 이루어지고 follower는 leader와 sync를 유지함으로써 leader에 문제가 생겼을 경우 follower들 중 하나가 leader역할을 하게 되는 것이다.


만약 카프카 클러스터내 broker 2에서 장애가 발생되었다면, broker 2에 있던 Topic-2(leader)의 역할을 대신 수행하기 위해 아래 그림과 같이 broker 1에 있는 Topic(follower)가 leader역할을 하게 될 것이다.


복제된 데이터가 follower들에게 있으니, 메시지의 유실이 없다는 장점이 있지만, 복제를 하기 위한 시간과 네트워크 비용이 들기 때문에 데이터의 중요도에 따라 ack옵션으로 성능과 데이터의 중요도에 따라 다음과 같이 세부설정이 가능하다.


ack (default:1)

0 : 프로듀서는 서버로부터 어떠한 ack도 기다리지 않음. 유실율 높으나 높은 처리량

1 : 리더는 데이터를 기록, 모든 팔로워는 확인하지 않음

-1(또는 all) : 모든 ISR 확인. 무손실


ack값을 설정하여 데이터의 무손실에 더 중요성을 둘 것인지 또는 유실을 어느정도 감수 하더라고 속도에 중요성을 둘 것인지를 셋팅할 수 있다.


지금까지 설명한 모든 구성요소를 그림으로 표현하면 아래 그림과 같다.


Producer에서는 메시지를 입력하고, Consumer에서 메시지를 읽어갈때 Zookeeper에서 broker 및 offset정보를 관리하기 때문에 분산처리가 가능하게 된다.


카프카를 운영하기에 앞서 기본적인 구성요소나 매커니즘에 대해 충분히 이해를 하면 운영 하는데 많은 도움이 될 것이다.


참고자료


Posted by @위너스

댓글을 달아 주세요