'2019/03/07'에 해당되는 글 2건

  1. 2019.03.07 Docker : Dockerfile 편
  2. 2019.03.07 [카프카(Kafka) 어플리케이션 제작 ] #1. 프로듀서 (2)

Docker : Dockerfile 편

Docker 2019. 3. 7. 18:29

Dockerfile 개요



지금까지 도커 이미지를 내려받아 컨테이너를 생성하고 아파치를 설치 후 도커 컨테이너에 접근하는 실습 과정과 아파치가 설치 된 상태까지의 컨테이너를 이미지화하는 내용을 학습하였다.


// 우분투 이미지 다운로드 # docker pull ubuntu:14.04 // 도커 컨테이너 생성 # docker create -i -t --name -p 80:80 ubuntutest docker.io/ubuntu:14:04 b0c51e1ead4b1e4234537ec00394837144ce83f64c2d3c2e1eb7cbabcec8af41 // 컨테이너 활성화 # docker start ubuntutest // 컨테이너 접근 # docker attach ubuntutest // 패키지 업데이트 # apt-get update // 아파치 설치 # apt-get install apache2 -y // 아파치까지 설치 된 상태의 컨테이너를 이미지 화 # docker commit ubuntutest ubuntutest_img



Dockerfile(이하 도커파일)은 위와 같은 일련의 과정들을 기록하고 실행 할 수 있는 일종의 스크립트이다. 도커파일을 실행 할 수 있는 build명령어를 제공하고 있으며 도커파일을 build 할 때 패키지, 콘솔 명령어, Shell Script 등도 함께 포함하여 실행 할 수 있다.


Dockerfile은 콘솔명령어, Shell Script등 기록된 내용을 그대로 실행하는 

일종의 설계도라고 보면 이해하기 쉽다.




Dockerfile 생성



Ubuntu 14.04에 아파치가 설치된 상태의 이미지를 Build 할 수 있는 도커파일을 만들어 보도록 하겠다. 처음부터 도커파일을 작성하기는 쉽지 않으므로 다른사람이 만들어둔 도커파일을 활용하도록 하자. 먼저 다운로드에 앞서 도커허브(https://hub.docker.com/)에 접속 해보자. 도커 허브에 접속하면 바로 검색창이 상단에 보이는데 우리가 찾고자 하는 ubuntu 기반의 아파치 설치를 위해 ubuntu apache라고 검색 해보자.





필자는 업로더 pamtrak06의 ubuntu14.04-apache2 도커파일을 참고하였다. pamtrak06/ubuntu14.04-apache2를 클릭하면 바로 해당 업로더의 페이지로 이동한다.





해당 페이지로 이동하면 pamtrak06 이라는 유저가 업로드한 도커파일뿐만 아니라 이미지도 pull명령어 또는 run명령어를 통해 내려 받을 수 있다. 우리는 도커파일이 필요하므로 Dockerfile 탭을 눌러 스크립트를 확인해보자.





캡쳐된 스크립트의 내용은 아래와 같다.

FROM ubuntu:14.04

MAINTAINER pamtrak06 < pamtrak06@gmail.com >

RUN apt-get update && apt-get install -y apache2 apache2-threaded-dev

# Configure localhost in Apache
RUN echo "ServerName localhost" >> /etc/apache2/apache2.conf

# Define default command
CMD ["apachectl", "-D", "FOREGROUND"]

# Expose ports 80/443... : to be override for needs
EXPOSE 80

#USER nobody


해당 텍스트를 복사하여 /home/testuser로 경로이동 후 vi에디터를 통해 Dockerfile을 생성하도록 한다.


여기서 저장하기 전 수정해야 하는 부분이 있는데 아파치를 설치 후 외부에서 접근 가능한 포트를 열어줘야 하므로 'EXPOSE 80'을 추가해준다. 'USER' 구문은 이미 root로 실행하고 있으므로 주석처리 하거나 root로 바꿔준다. -t 옵션을 붙이면 설정한대로 apache_img로 생성 되지만 붙이지 않으면 임의의 16진수로 생성되니 이미지명을 지정하려면 -t 옵션을 꼭 넣기 바란다.


# docker build -t apache_img /home/testuser

Step 1/6 : FROM ubuntu:14.04 Step 2/6 : MAINTAINER pamtrak06 <pamtrak06@gmail.com> ---> Using cache ---> 2c7545bf3112 Step 3/6 : RUN apt-get update && apt-get install -y apache2 apache2-threaded-dev ---> Running in 62fd9a272ce4 Ign http://archive.ubuntu.com trusty InRelease Get:1 http://security.ubuntu.com trusty-security InRelease [65.9 kB] Get:2 http://archive.ubuntu.com trusty-updates InRelease [65.9 kB] Get:3 http://archive.ubuntu.com trusty-backports InRelease [65.9 kB] ... Processing triggers for libc-bin (2.19-0ubuntu6.14) ... Processing triggers for ca-certificates (20170717~14.04.2) ... Updating certificates in /etc/ssl/certs... 148 added, 0 removed; done. Running hooks in /etc/ca-certificates/update.d....done. Processing triggers for sgml-base (1.26+nmu4ubuntu1) ... Processing triggers for ureadahead (0.100.0-16) ... ---> 303f9b111bd3 Removing intermediate container 62fd9a272ce4 Step 4/6 : RUN echo "ServerName localhost" >> /etc/apache2/apache2.conf ---> Running in 21b3d68727c2 ---> ff21c8331d36 Removing intermediate container 21b3d68727c2 Step 5/6 : CMD apachectl -D FOREGROUND ---> Running in 93503a25c881 ---> d17b03a01d1f Removing intermediate container 93503a25c881 Step 6/6 : EXPOSE 80 ---> Running in f86591442099 ---> d307433464d8 Removing intermediate container f86591442099

Successfully built 786550292665


빌드가 성공적으로 완료 되었다.




그럼 이제 각 Step별 처리과정에 대해서 알아보도록 하자.


Step 1/6 : FROM ubuntu:14.04 → ubuntu:14.04 이미지를 기반으로 만들겠다는 구문 Step 2/6 : MAINTAINER pamtrak06 <pamtrak06@gmail.com> → 최초 생성자는 pamtrak06이고 email 주소는 pamtrak06@gmail.com 이다. Step 3/6 : RUN apt-get update && apt-get install -y apache2 apache2-threaded-dev → 콘솔명령어 apt-get update로 package 최신화 시킨 후 apache2를 묻지도 따지지도 않고(-y옵션) multi-threaded server 버전으로 다운로드 하라는 뜻

Step 4/6 : RUN echo "ServerName localhost" >> /etc/apache2/apache2.conf → 도커 컨테이너 내부의 apache2.conf에 ServerName을 localhost로 지정 한다는 뜻 (apache2 최초 설치 시 ServerName이 지정 되어있지 않음) Step 5/6 : CMD apachectl -D FOREGROUND → 컨테이너를 올릴때 자동으로 아파치를 기동한다는 명령어 Step 6/6 : EXPOSE 80 → 80번 포트를 외부에서 접근 가능하게 하겠다는 구문



그럼 이제 생성된 이미지를 확인하기 위해 docker images 명령으로 목록을 확인해보자


# docker images REPOSITORY TAG IMAGE ID CREATED SIZE apache_img first d307433464d8 6 minutes ago 390 MB layer_test second 127ab2a39585 13 days ago 188 MB layer_test first 48f47ef8257d 13 days ago 188 MB docker.io/ubuntu 14.04 5dbc3f318ea5 6 weeks ago 188 MB


아까 도커 build명령을 사용하여 빌드할때 설정했던 이름으로 이미지가 잘 생성 되었다. 이제 컨테이너를 생성할 차례인데 이미 도커 파일에서 EXPOSE 80이라는 설정으로 80포트를 오픈 해두었다 이제 해당 포트와 자동으로 매핑 시켜주는 -P 옵션을 추가 해주어야 한다. -d는 detached 옵션으로 백그라운드에서 실행 된다는 뜻이다.



# docker run -d -P --name apache_test apache_img:first 35adceda902976455b18631fe0677d5b3d247700c42ff2791383af40385fe068 # docker ps -a CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 35adceda9029 apache_img:first "apachectl -D FORE..." 28 seconds ago Up 28 seconds 0.0.0.0:32768->80/tcp apache_test d33a60d75c45 layer_test:first "/bin/bash" 13 days ago Exited (255) 26 hours ago layer_test2 c73025587f2b ubuntu:14.04 "/bin/bash" 13 days ago Exited (0) 13 days ago layer_test 0c18bda4235f ubuntu:14.04 "/bin/bash" 13 days ago Exited (0) 13 days ago nifty_fermat


컨테이너가 잘 생성되었다. 이제 지난 Docker : 컨테이너 편과 마찬가지로 HostOS에서 GuestOS의 아파치로 접근이 가능한지 확인 후 이번 편을 마치도록 하겠다.


는 무언가 잘못 되었다!!


외않되!!


# docker ps -a CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 35adceda9029 apache_img:first "apachectl -D FORE..." 28 seconds ago Up 28 seconds 0.0.0.0:32768->80/tcp apache_test d33a60d75c45 layer_test:first "/bin/bash" 13 days ago Exited (255) 26 hours ago layer_test2 c73025587f2b ubuntu:14.04 "/bin/bash" 13 days ago Exited (0) 13 days ago layer_test 0c18bda4235f ubuntu:14.04 "/bin/bash" 13 days ago Exited (0) 13 days ago nifty_fermat


어찌된 영문인지 PORTS 항목을 자세히 보면 포트가 32768로 설정이 되어있다. Dockerfile에서 EXPOSE 80이라는 옵션을 주었기에 -P 옵션은 자동매핑이 가능해서 80은 80포트와 443은 443포트끼리 자동 매핑 되는줄 알았는데 그게 아니었다. 이래서 뇌피셜이 무섭다.

--publish , -pPublish a container’s port(s) to the host
--publish-all , -PPublish all exposed ports to random ports

무려 랜덤 매핑이다.


으응..?


때문에 직접 포트를 지정해주지 않으면 HostOS에서 GuestOS의 동일포트로 넘겨주기 어렵다.

아래와 같이 다시 -p 옵션을 주어 넘겨줄 포트와 넘겨받을 포트를 지정해주도록 하자.


#docker stop 35adceda9029 35adceda9029 # docker rm 35adceda9029 35adceda9029 # docker run -d -p 80:80 --name apache_test apache_img:first b748f27ede243331660ab7ea129a3052b26d416599f85a09ff61cae84efb703f # docker ps -a CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES b748f27ede24 apache_img:first "apachectl -D FORE..." 18 seconds ago Up 17 seconds 0.0.0.0:80->80/tcp apache_test d33a60d75c45 layer_test:first "/bin/bash" 13 days ago Exited (255) 26 hours ago layer_test2 c73025587f2b ubuntu:14.04 "/bin/bash" 13 days ago Exited (0) 13 days ago layer_test 0c18bda4235f ubuntu:14.04 "/bin/bash" 13 days ago Exited (0) 13 days ago nifty_fermat



설정이 잘 되었다. 이제 HostOS에서 GuestOS 80포트를 다시 호출해보도록 하자. 아래와 같은 화면이 나온다면 접근에 성공한것이다.





여기까지 도커파일을 생성하고 build명령을 통해 이미지 생성 및 컨테이너 생성, 80포트 접근(아파치)까지 진행 해봤다. 도커파일은 고용량 이미지를 보내기 부담스러울 때 텍스트로 대체하여 다른 사람에게 전달 할 수도 있고 이미지를 생성 하기 전 여러가지 옵션으로 설정을 변경 할 수도 있다. 때문에 도커파일은 도커 이미지와는 다르게 여러가지 커스텀한 옵션으로 다양한 환경에서 적절히 사용할 수 있는 아주 유용한 스크립트이다. 다음 편에서는 도커파일을 이용한 실습 및 활용에 대해 설명하겠다.




Docker : Dockerfile 편

끝.

'Docker' 카테고리의 다른 글

Docker : 컨테이너 오케스트레이션 개요 편  (0) 2019.04.19
Docker : Dockerfile 실습 편  (0) 2019.03.22
Docker : Dockerfile 편  (0) 2019.03.07
Docker : 이미지 편  (0) 2019.02.22
Docker : 컨테이너 편  (1) 2019.02.15
Docker : 설치 편  (0) 2019.01.31
Posted by DevStream

댓글을 달아 주세요


위의 링크 글에서는 Kafka 개요 및 설치, 명령어를 이용하여 토픽 생성, 토픽 메시지 Publish, Subscribe에 대해서 설명하였다. 


이번에는 Kafka 라이브러리를 이용하여 어플리케이션을 만들어보자.

본인은 Intellij IDE 환경에서 SpringBoot 2.1 플래폼을 기반으로 Maven 빌드를 사용한다. 

 

프로듀서나 컨슈머를 사용하기 위해서는 kafka-clients를 이용하므로 pom.xml에 종속성을 추가한다. 


    
    org.apache.kafka
    kafka-clients
    2.1.0

프로듀서

카프카 프로듀서 내부 동작 및 프로듀서 어플리케이션에서 카프카 큐로 메시지가 전달되는 과정을 알아보자.

프로듀서 내부 동작

메시지 게시(Publish)를 위한 동작 순서
1. 프로듀서 API 호출 ↓
2. Key-Value 형태로 이뤄진 Kafka 구성 정보를 전달 받음 ↓
3. 메시징 데이터를 바이트 배열로 직렬화 ↓
4. 전송할 파티션 결정(옵션) ↓
5. 메시지를 보내기 위해 브로커 결정 ↓
6. API에 쓰기 요청 ↓
7. 응답 수신 (응답이 수신되명 성공으로 간주)
   오류 코드 수신 (예외를 발생시키거나 설정에 따른 재시도 수행)

메시지 Publish와 별개로 카프카 프로듀서가 책임지고 있는 역활에 대해서 정리해본다. 

카프카 프로커 URL 부트스트랩
프로듀서는 카프카 클러스터에 대한 메타데이터를 가져오기 위해 최소 하나의 브로커에 연결한다. 
보통 다수의 브로커에 연결한다. 

데이터 직렬화
브로커에게 메시지를 보내기 전 TCP 기반 데이터 송수신을 위하여 데이터 객체를 바이트 직렬화한다. 

토픽 파티션 결정
일반적으로 메시지 데이터 객체의 키 해시를 이용하여 어떤 파티션으로 데이터가 전송되어야 하는지 결정한다. 
아니면 커스텀하게 로직을 만들어서 파티션을 결정할 수 있도록 사용자 정의 파티션 코드를 작성할 수 있다.

처리 실패/재시도 
프로듀서 API 설정을 통해 재시도 횟수나 예외 처리가 가능하다. 
예외 종류에 따른 데이터 흐름을 결정할 수 있다.

배치 처리 
배치 처리는 입출력 횟수를 줄이고 프로듀서 메모리를 최적화한다.  
보통 메세지 수를 결정하여 처리하며, 전반적인 처리 속도는 배치에서의 메시지 수에 비례해 증가한다. 

프로듀서 어플리케이션 

프로듀서 어플리케이션 작성의 경우 추상화 계층에 메서드를 노출하는 프로듀서 API를 사용한다. 


카프카 프로듀서 API 사용 순서

1. 기본 설정 정보 ↓
2. 프로듀서 객체 생성 ↓
3. 프로듀서 레코드 객체 생성 ↓
4. 사용자 정의 파티션 생성(옵션) ↓
5. 추가 설정 

기본 설정 정보

 bootstrap.servers

  • 브로커 주소 목록
  • hostname:port 형식
  • 한 개 이상 설정 가능 (브로커 접속 실패할 수 있으므로 최소 두개 지정 권장)

 key.serializer

  • 브로커는 메시지를 전달받으면 메시지 키 정보가 어떠한 직렬화가 되어 있다고 가정하므로 어떤 직렬화를 써는지 명시해야함 
  • 카프카는 ByteArraySerializer, StringSerializer, IntegerSerializer 세가지 내장된 클래스 제공

 value.serializer

  • key.serializer와 비슷하며 메시지 값 정보에 대한 직렬화를 명시


프로듀서 객체 및 프로듀서 레코드 객체 생성

토픽에 레코드를 전송하는 ProducerRecord 객체는 토픽이름, 파티션 번호, 타입스탬프, 키, 값 등을 포함한다. 

토픽이름과 메시지 데이터 값은 필수 파라미터이며, 파티션 번호나 타임스탬프, 키, 값 들은 선택 파라미터이다. 

동기메시징은 브로커에서 회신을 RecordMetadata를 보내고, 비동기 메시징은 브로커에서 즉각 응답 처리만 하며, 프로듀서에서는 콜백 인터페이스를 수행하여 처리한다.

동기메시징으로도 잠깐 테스트를 하였는데 메시지 사이즈 100byte 기준으로 1만 TPS정도 나온다. 


사용자 정의 파티션 생성

대부분의 경우 기본 파티션을 사용하는 경우 충분하나, 하나의 키에 대한 데이터의 비중이 매우 큰 경우 별도의 파티션의 할당이 필요할 수 있다. 

예) K라는 키에 전체데이터의 30%가 있는 경우 N파티션을 할당에 다른키가 N파티션에 할당되지 않게 만들어 공간이 소진되거나 속도가 느려지지 않도록 한다. 

카프카는 사용자 정의 파티션을 생성할 수 있도록 파티셔너 인터페이스를 제공한다. 

package com.example.kafkatest;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;
import java.util.Map;

public class CustomPartition implements Partitioner {
    public int partition(
            String topicName,
            Object key,
            byte[] keybBytes,
            Object value,
            byte[] valueBytes,
            Cluster cluster
    )
    {
        List partitionInfos = cluster.partitionsForTopic(topicName);
        int numPartitions = partitionInfos.size();
        //로직여기에
        return 0;
    }

    @Override
    public void close(){}

    @Override
    public void configure(Map map){}
}
추가 설정 정보

 buffer.memory

  • 카프카 브로커로 메시지 전송 전에 사용할 최대 버퍼 메모리 크기
  • 설정 한계에 도달하면 프로듀서는 예외를 발생시키기 전에 max.block.ms 시간 동안 대기
  • batch.size가 크다면 프로듀서에 더 많은 메모리를 할당해야 함
  • request.timeout.ms를 사용해 시간 초과 설정을 적용하는 좋음

 ack

  • 0, 1, all 사용
  • 0 : ack를 기다리지 않음 (fire and forget)
  • 1 : 리더가 메시지를 기록하자마자 ack를 받음
  • all : 리더가 복제하는 팔로워까지 완료된 경우 ack를 받음
  • 카프카(Kafka)의 이해 Replication - leader & follower 참조

 batch.size

  • 크기 많큼 배치 처리를 허용
  • 대량으로 보낼경우 1000이상으로 설정하는 걸 권장
  • 1~10으로 할 경우 메시지 유실이 발생하는데 정확한 부부은 확인이 필요 (유실 테스트까지만 함)

 linger.ms

  • 브로커로 전송전에 프로듀서가 추가 메시지를 기다리는 시간

 compression.type

  •  프로듀서는 압축을 사용하지 않고 메시지를 전송이 Default임
  • GZIP, Snappy, LZA 형식의 압축이 가능하며, 배치 처리가 많을수록 압축 효과가 좋다. 
  • 되도록 사용하길 권장 Logstash에서 압축과 비압축 차이가 컸다

 retres

  • 메시지 전송이 실패했을시 예외를 발생시키기전 재전송 시도 값 

 partitioner.class

  •  사용자 정의 파티션 생성 시 허용

 timeout.ms

  •  프로듀서에서 오류를 보내기 전에 팔로워의 Ack를 리더가 기다리는 시간


프로듀서 전체 코드

package com.example.kafkatest;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Future;

@SpringBootApplication
public class KafkaProducerApplication {
    private static final Logger log = LoggerFactory.getLogger(KafkaTestApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerApplication.class, args);
        publish();
    }

    private static Properties setProducerProperty(){
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers","192.168.100.173:9092");
        producerProps.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("ack","all");
        producerProps.put("retries", 1);
        /*
            배치사이즈가 작으면 대량전송시 유실됨 최소 1000이상 잡는게 효율적
        */
        producerProps.put("batch.size", 1);
        producerProps.put("linger.ms", 1);
        producerProps.put("buffer.memory", 24568545);
        return producerProps;
    }

    private static void publish(){
        KafkaProducer producer = new KafkaProducer(setProducerProperty());
        log.info("Pubulish to topic :\t" + KafkaStatic.DEFAULT_TOPIC);
        log.info("Pubulish Start");
        for (int i=0; i<10; i++  ) {
            ProducerRecord data = new ProducerRecord(KafkaStatic.DEFAULT_TOPIC, i+"-----"+UUID.randomUUID().toString());
            Future recordMetadata = producer.send(data);
        }
        producer.close();
        log.info("Pubulish End");
    }
}
package com.example.kafkatest;

public class KafkaStatic {
    public static final String DEFAULT_TOPIC = "hirang";
}

프로듀서 콘솔 로그

어플리케이션을 구동하면 아래와 같이 Kafka 설정 관련 정보를 볼 수 있다. 

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.3.RELEASE)

2019-03-07 11:21:18.822  INFO 376 --- [           main] c.e.kafkatest.KafkaProducerApplication   : Starting KafkaProducerApplication on ts-hirang with PID 376 (D:\workspace\github\kafka-test\target\classes started by DESKTOP in D:\workspace\github\kafka-test)
2019-03-07 11:21:18.824  INFO 376 --- [           main] c.e.kafkatest.KafkaProducerApplication   : No active profile set, falling back to default profiles: default
2019-03-07 11:21:19.398  INFO 376 --- [           main] c.e.kafkatest.KafkaProducerApplication   : Started KafkaProducerApplication in 0.795 seconds (JVM running for 1.635)
2019-03-07 11:21:19.409  INFO 376 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = 1
	batch.size = 1
	bootstrap.servers = [192.168.100.173:9092]
	buffer.memory = 24568545
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 1
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 1
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2019-03-07 11:21:19.456  WARN 376 --- [           main] o.a.k.clients.producer.ProducerConfig    : The configuration 'ack' was supplied but isn't a known config.
2019-03-07 11:21:19.459  INFO 376 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2019-03-07 11:21:19.460  INFO 376 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fa14705e51bd2ce5
2019-03-07 11:21:19.460  INFO 376 --- [           main] c.e.kafkatest.KafkaTestApplication       : Pubulish to topic :	hirang
2019-03-07 11:21:19.461  INFO 376 --- [           main] c.e.kafkatest.KafkaTestApplication       : Pubulish Start
2019-03-07 11:21:19.526  INFO 376 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: 6ejl7iMZQASnPqRCdv13wA
2019-03-07 11:21:19.534  INFO 376 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2019-03-07 11:21:19.577  INFO 376 --- [           main] c.e.kafkatest.KafkaTestApplication       : Pubulish End

실행 결과

프로듀서 모범 사례

카프카 프로듀서의 내부 동작과 일반적인 토픽에 메시지 게시(Publish)하는 방법을 알아보았다.
프로듀서 구성 요소를 정의할 때 좀 더 바람직한 가이드라인에 대해서 알아보자.

데이터 유효성 검증을 하자!

프로듀서 어플리케이션을 제작할 때 데이터 스키마의 정합성, null이 아닌 키항목 등 유효성 검증을 간과하기 쉽다. 

이런 유효성 검증을 하지 않으면 메시지 데이터가 올바르게 파티션에 할당되지 않으므로, 브로커에 대한 부하 분산에 영향을 준다. (순서를 보장해야하는 1파티션이면 상관없을듯...)


예외처리를 하자!

모든 어플리케이션의 예외처리는 상황에 따라 무조건 필요하다. (너무 남발해서도 문제지만..)

프로듀서에서 예외 상황에 따라 적절한 흐름 제어를 해야한다.

예) 게임 로그 데이터의 임시 큐로 Kafka를 사용할 경우 아이템 거래 사기 발생 시 계정이나 기사단 블럭 등 예외 관련 흐름 제어....


재시도 횟수!

카프카 클러스터 연결 타임아웃 예외 발생 시 메시지 손실이 날 수 있으니 재시도 횟수를 설정한다. 


부트스트랩 URL 수!

카프카 브로커 설정은 반드시 두 개 이상으로 설정한다. 

프로덕션 환경에서 최소 카프카 클러스터 구성은 카프카 3노드, 주키퍼 3노드를 사용하기 때문에 세 개를 적어주면 된다. 


잘못된 파티셔닝 방식의 사용 방지!

사용자 정의 파티션을 잘못 사용하게 되면 토픽의 모든 파티션으로 균일하게 분배되지 않을 수 있고, 최적화된 병렬처리가 불가능 하도록 만들 수 있다.  사용자 정의 파티션 사용시 라운드 로빈을 기본으로 넣자.


메시지의 임시 보존!

카프카 클러스터 장애로 인하여 재시도 횟수를 넘었을 시 메시지 소실이 발생할 수 있으므로 메시지를 다시 전송할 있도록 디스크나, 데이터베이스를 이용하자. 

마치며

카프카 프로듀서 내부 동작을 이해하고 간단하게 어플리케이션을 만들어 보았다. 
다음에는 컨슈머 어플리케이션에 대해 만들어본다.


Posted by 사용자 피랑이

댓글을 달아 주세요

  1. AndersonChoi 2019.08.30 16:41 신고  댓글주소  수정/삭제  댓글쓰기

    감사합니다. 잘 정리해주셨네요. 많은도움되었습니다.
    특히 추가 설정 정보설명이 유용했습니다!

  2. 셈프 2021.01.19 22:54  댓글주소  수정/삭제  댓글쓰기

    max.block.ms 너무 짧게 잡으면 프로듀싱 시 대기 시간 때문에 메모리 이슈가 있을수 있습니다. 30초에서 60초(60000) 정도 필요합니다.
    1초로 잡았다가 초당 2~30만건 카프카에 적재시 에러가 발생하네요