Kafka

[카프카(Kafka) 어플리케이션 제작 ] #2. 컨슈머

피랑이 2019. 3. 21. 17:44

카프카(Kafka)의 이해

카프카(Kafka) 설치 및 클러스터 구성

[카프카(Kafka) 어플리케이션 제작 ] #1. 프로듀서

[카프카(Kafka) 어플리케이션 제작 ] #2. 컨슈머


이전 글에서는 프로듀서 내부 동작 확인 및 어플리케이션을 제작하였다. 

이번에는 컨슈머 어플리케이션을 제작해본다. 

컨슈머 

카프카 컨슈머 내부 동작 및 컨슈머 어플리케이션에서 메시지 소비하는 과정을 알아보자.

컨슈머 내부 동작

컨슈머의 전체적인 내부 동작을 이해하면 컨슈머 어플리케이션을 디버깅할 때 도움이 많이 되며, 올바른 결정을 하도록 도와준다.

카프카 컨슈머의 역활

 토픽 구독

  • 컨슈머 동작의 시작은 토픽의 구독임 

 오프셋 위치

  • 카프카는 다른 큐와는 다르게 메시지 오프셋을 저장 안함
  • 오프셋은 각자의 컨슈머들이 유지해야함(컨슈머 API를 사용)

 재연/되감기/메시지 스킵

  • 상황에 따라 커스텀하게 오프셋을 변경할 수 있음
  • 재연/되감기/메시지 스킵 가능
 하트비트
  • 컨슈머의 지정된 파티션의 멤버쉽과 소유권을 확인하기 위해 하트비트를 이용하여 주기적으로  체크
  • 하트비트를 수신하지 못하면 컨슈머 그룹의 컨슈머 재할당(리벨런싱)이 일어난다. 

 오프셋 커밋

  •  리벨런싱이 일어날 때 이미 읽은 오프셋을 다시 읽을 수 있으므로 오프셋 커밋을 함

 역직렬화

  •  프로듀서에서는 카프카에 메시지를 보낼 때 어떤 직렬화를 했는지 명시하고 컨슈머에서는 역직렬화를 명시함 (프로듀서 글의  key.serializer 참고)


메시지 수신(Subscribe)을 위한 동작 순서

1. 토픽 구독 ↓
2. 카프카 서버 조사(폴 루프) : 서버 조정, 파티션 리벨런스, 컨슈머 하트피트 체크 등↓
3. 새로운 레코드가 있는 지 체크하고 있으면 가져옴↓
4. 역직렬화 ↓
5. 데이터 유효성 검증 ↓
6. 다른 시스템에 이관 및 저장 

컨슈머 어플리케이션 

프로듀서 API와 마찬가지로 컨슈머도 풍부한 API 세트를 제공한다. 

컨슈머 기본 설정 및 코드

 bootstrap.servers

  • 브로커 주소 목록
  • hostname:port 형식
  • 프로듀서 설정과 동일

 key.deserializer

  • 프로듀서에서 key.serializer와 동일한 클래스로 deserializer 한다.
  • 다른 클래스로 deserializer 하면 예외 발생
  • ByteArraySerializer, StringSerializer, IntegerSerializer 클래스 선택

 value.deserializer

  • key.deserializer와 비슷하며 메시지 값 정보에 대한 역직렬화를 명시

 group.id

  • 컨슈머 그룹 정의
  • 이전 버전에서는 필수가 아니었지만 최근 버전에서는 필수로 변경 사용하지 않으면 아래와 같은 에러 발생(Attempt to join group failed due to fatal error: The configured groupId is invalid)
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers","192.168.100.173:9092");
        producerProps.put("group.id", "hirang_test02");
        producerProps.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        producerProps.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        return producerProps;
        KafkaConsumer consumer = new KafkaConsumer(setConsumerProperty());

구독

컨슈머는 메시지 데이터를 받기 위해 토픽을 구독한다. (KafkaConsumer.subscribe 사용)

구독 메소드는 입력 파라미터에 따라 다양한 구독이 가능하다. 

subscribe(Collection<String> topics)

: 원하는 토픽 목록을 전달 / 기본 리밸런서 사용

subscribe(Pattern pattern, ConsumerRebalanceListener listener)

: 원하는 토픽을 찾기 위해 정규식을 전달 / 정규식에 대응하는 새로운 토픽의 추가가 삭제 발생시 리밸런서 트리거됨

subscribe(Collection<String> topics, ConsumerRebalanceListener listener)

: 토픽의 목록을 전달 / 리밸런서 트리거됨


오프셋 커밋 처리

오프셋에 대한 커밋이 일어나는 방법은 3가지가 존재하며 각각 특성이 존재한다. 

 자동커밋

  •  컨슈머의 기본 설정값 
  • enable.auto.commit=true 
  • auto.commit.interval.ms=1000 이 값을 길게 잡으면 장애 발생시 중복 읽기가 발생할 수 있음 
  • 예) 커밋 주기가 10초인데 7초가 지난 후 컨슈머 장애가 발생하면 이전 10초 전에 커밋한 오프셋을 가져오므로 중복 발생

 현재 오프셋 커밋

  • 상황에 따라 필요할 때 커밋 제어시 사용
  • enable.auto.commit=false
  • commitSync() 메서드를 사용해 커밋할 컨슈머 오프셋을 호출한다. 
  • ConsumerRecord의 인스턴스를 처리 후 사용 권장하며 그렇게 안하면 컨슈머 장애기 발생할 경우 레코드 손실 발생

 비동기 커밋

  •  동기 방식의 커밋은 Ack 수신이 없는 경우, 컨슈머는 대기 상태가 되므로 결과적으로 처리 속도가 좋지 못하다. 
  • 비동기도 메시지 중복은 발생할 수 있다. 
  • 예) 메시지 오프셋 10을 오프셋 5 이전에 커밋 했다면, 카프카는 5부터 10까지 다시 읽으므로 중복 발생


추가 설정

설정 작업 이전에 데이터를 처리하기 위해 컨슈머가 얼마의 시간이 필요한지 확인 및 테스트가 필요하다.

 enable.auto.commit

  • true/false
  • 오프셋 자동 커밋

 fetch.min.bytes

  • 데이터 읽기 요청에 대한 카프카 서버의 회신을 요구하는 데이터 바이트의 최소 크기 

 request.timeout.ms

  • 응답을 기다리는 최대 시간 

 auto.offset.reset

  • 유효한 오프셋이 없을 때  자동처리됨

 lastest : 파티션에서 가장 최근 메시지부터 시작

 earliest : 파티션 맨 처음부터 시작

 none : 예외가 발생됨

 session.timeout.ms

  • "컨슈머 동작 중이다"라고 알리기 위한 하트비트 전송 주기
  • 리밸런서 트리거 하는걸 막기 위함

 max.partition.fetch.bytes

  • 파티션마다 할당할 최대 데이터 크기
  • ConsumerRecord 객체에 대해 컨슈머가 필요로 하는 메모리는 파티션수 * 설정값보다 커야함
  • 예) 파티션 10개 1개의 컨슈머이고, max.partiton.fetch.bytes가 2MB면 10 * 2MB = 20MB가 메모리로 잡혀 있어야함


컨슈머 전체 코드
package com.example.kafkatest;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;

@SpringBootApplication
public class KafkaConsumerApplication {
    private static final Logger log = LoggerFactory.getLogger(KafkaTestApplication.class);
    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
        subscribe();
    }

    private static Properties setConsumerProperty(){
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers","192.168.100.173:9092");
        producerProps.put("group.id", "hirang_test02");
        producerProps.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        producerProps.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        producerProps.put("enable.auto.commit", "true");
        producerProps.put("auto.commit.interval.ms", "1000");
        producerProps.put("session.timeout.ms", "30000");
        return producerProps;
    }

    private static void subscribe(){
        String topic = "hirang";
        List topicList = new ArrayList<>();
        topicList.add(topic);
        KafkaConsumer consumer = new KafkaConsumer(setConsumerProperty());
        consumer.subscribe(topicList);
        log.info("Subscribed to topic :\t" + KafkaStatic.DEFAULT_TOPIC);
        int i = 0;
        try {
            while (true) {
                ConsumerRecords records = consumer.poll(500);
                for (ConsumerRecord record : records)
                    log.info("offset = " + record.offset() + "\tkey =" + record.key() + "\tvalue =" + record.value());

                //TODO : Do processing for data here
                consumer.commitAsync(new OffsetCommitCallback() {
                    public void onComplete(Map map, Exception e) {

                    }
                });
            }
        } catch (Exception ex) {
            //TODO : Log Exception Here
        } finally {
            try {
                consumer.commitSync();

            } finally {
                consumer.close();
            }
        }

    }
}

컨슈머 콘솔 로그

읽은 메시지 데이터를 로그로 확인하였다. 

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.3.RELEASE)

2019-03-08 11:30:52.369  INFO 4556 --- [           main] c.e.kafkatest.KafkaConsumerApplication   : Starting KafkaConsumerApplication on ts-hirang with PID 4556 (D:\workspace\github\kafka-test\target\classes started by DESKTOP in D:\workspace\github\kafka-test)
2019-03-08 11:30:52.379  INFO 4556 --- [           main] c.e.kafkatest.KafkaConsumerApplication   : No active profile set, falling back to default profiles: default
2019-03-08 11:30:53.178  INFO 4556 --- [           main] c.e.kafkatest.KafkaConsumerApplication   : Started KafkaConsumerApplication in 1.766 seconds (JVM running for 3.564)
2019-03-08 11:30:53.192  INFO 4556 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	auto.commit.interval.ms = 1000
	auto.offset.reset = latest
	bootstrap.servers = [192.168.100.173:9092]
	check.crcs = true
	client.id = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = hirang_test02
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 30000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2019-03-08 11:30:53.238  INFO 4556 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2019-03-08 11:30:53.239  INFO 4556 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fa14705e51bd2ce5
2019-03-08 11:30:53.240  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : Subscribed to topic :	hirang
2019-03-08 11:30:53.318  INFO 4556 --- [           main] org.apache.kafka.clients.Metadata        : Cluster ID: 6ejl7iMZQASnPqRCdv13wA
2019-03-08 11:30:53.319  INFO 4556 --- [           main] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=hirang_test02] Discovered group coordinator 192.168.100.173:9092 (id: 2147483646 rack: null)
2019-03-08 11:30:53.321  INFO 4556 --- [           main] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=hirang_test02] Revoking previously assigned partitions []
2019-03-08 11:30:53.321  INFO 4556 --- [           main] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=hirang_test02] (Re-)joining group
2019-03-08 11:30:53.347  INFO 4556 --- [           main] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=hirang_test02] Successfully joined group with generation 12
2019-03-08 11:30:53.348  INFO 4556 --- [           main] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=hirang_test02] Setting newly assigned partitions [hirang-0]
2019-03-08 11:31:03.104  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277494	key =null	value =0-----87d9bcc5-3b75-4032-9427-e02b75a9eed7
2019-03-08 11:31:03.107  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277495	key =null	value =1-----9c6e8e14-dd8e-4ddf-b810-4cf1f3c80e5c
2019-03-08 11:31:03.108  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277496	key =null	value =2-----5f8eac28-06ed-4c53-a0c0-a15721a6ede2
2019-03-08 11:31:03.108  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277497	key =null	value =3-----e6dd3658-5bc2-4bdb-ab7b-b127f8ca176a
2019-03-08 11:31:03.108  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277498	key =null	value =4-----e077a534-d95e-4aca-adb3-1c2c1a29fce1
2019-03-08 11:31:03.108  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277499	key =null	value =5-----5b38908b-ec60-43e8-98cc-76fa66451483
2019-03-08 11:31:03.112  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277500	key =null	value =6-----d049b157-9370-433e-9cee-7101918eb449
2019-03-08 11:31:03.112  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277501	key =null	value =7-----8786e6a2-c757-4a3f-be59-130d5c393bfc
2019-03-08 11:31:03.116  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277502	key =null	value =8-----30b82719-abc5-4e1e-81d1-d8e47e53cb7d
2019-03-08 11:31:03.122  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277503	key =null	value =9-----687f0bdb-3280-415f-9fb8-8d5734380755

컨슈머 모범 사례

카프카 컨슈머의 내부 동작과 토픽의 메시지 소비(Subscribe)하는 방법을 알아보았다. 
컨슈머 구성 요소를 정의할 때 좀 더 바람직한 가이드라인에 대해서 알아보자.

예외처리를 하자!
모든 어플리케이션은 예외처리... 

리벨런스 관리!
새로운 컨슈머가 컨슈머 그룹에 합류할 때마다, 혹은 예전 컨슈머가 종료되면 파티션 리밸런스가 트리거 된다. 
(컨슈머 그룹의 컨슈머 개수와 파티션이 같을 때 새로운 컨슈머 추가는 리밸런스가 안일어남)
컨슈머가 파티션 소유권을 잃을 때마다 가장 최근에 수신한 오프셋을 반드시 커밋해 줘야 한다. 

제때 오프셋 커밋하기!
메시지 오프셋에 대한 커밋은 처리 상황에 따라 잘 제때 수행해야 한다. 
그렇지 않으면 중복 및 소실이 발생할 수 있다. 
오프셋 주기에 따른 성능과 중복에 대한 문제간의 트레이드 오프가 필요한다. 
예) 장애 발생시 데이터 중복 처리에 대한 심각한 문제가 발생할 경우 오프셋을 커밋하는 시간을 가능한 짧게 한다. 

자동 오프셋 커밋!
중복 처리에 대한 문제가 없거나, 컨슈머가 자동으로 오프셋을 커밋하도록 관리하기를 바랄경우 사용한다. 
일반적으로 대부분 사용

마치며

카프카의 프로듀서 및 컨슈머에 대한 내부 동작에 대해서 이해하였고 어플리케이션도 만들어 보았다. 

프로덕션 환경에 적용할 때 컨슈머 그룹안에서 오프셋 커밋 전략,하트비트, 리벨런스 등 여러 성능 테스트가 필요하다.