지난 Docker : Dockerfile 편에서는 도커파일의 개념 설명과 도커파일의 생성, 그리고 도커파일을 이용하여 이미지를 만들고 컨테이너를 실행하여 아파치에 접근해보는 간단한 실습을 진행 하였다. 이번 시간에는 도커를 이용하여 Jenkins Slave Node를 생성하여 빌드해보는 실습을 진행 해보고 이 과정을 도커파일로 만들어보도록 하겠다.



Jenkins 분산빌드환경


서버의 자원이 한정된가난한 상태에서 다수의 사람들이 Jenkins(이하 젠킨스)를 통해 빌드를 하다보면 점점 빌드가 잦아지고 결국 빌드를 대기하는 일종의 빌드 병목현상이 발생한다. 게다가 몸집이 큰 프로젝트 파일을 빌드하기 위해서 상당한 시간이 소요되는 경우도 있어서 빌드를 기다리기보다 수동빌드를 하는게 더 나은 상황이 발생 할 수 있다.


이를 분산빌드환경(Slave Node 추가)을 통하여 해결 해보도록 하자.



먼저 젠킨스 메인 화면의 좌측 Jenkins 관리를 누르고 아래로 스크롤하여 노드관리 메뉴로 진입한다. 신규 노드를 눌러 노드명을 입력하고 Permanent Agent 선택 후 OK를 누르면 아래 상세정보 입력 화면으로 이동한다. 필자는 Slave Node Name을 m7Docker라고 입력하였다. Credentials 에는 Docker 컨테이너에 접근할 계정을 Add를 눌러 입력해주면 된다. 필자는 ID는 root에 비밀번호는 xptmxm123으로 입력 후 저장하였다. 이후 이미지에는 나와있지 않지만 필자는 SSH 포트를 8890으로 변경해주었다 (고급버튼 누르면 포트변경 가능)




Jenkins Build를 위한 Slave Node용 도커 컨테이너 생성


다음은 Ubuntu 16.04를 이용하여 젠킨스 서버에서 도커 컨테이너로 SSH로 접근 후 maven 빌드를 수행하는 컨테이너 하나를 만들어보도록 하겠다. 이미지 생성과 컨테이너 생성은 이미 한번 다룬 내용이므로 너무 자세한 설명은 생략 하고 명령어 위주로 작성하겠다.


먼저 docker run명령을 통해 컨테이너를 생성하자 포트포워딩은 젠킨스 slave설정에서 지정해둔 8890 포트와 도커 컨테이너에 접근할 22번 포트를 서로 연결하였다.


[root@localhost testuser]# docker run -i -t -p 8890:22 --name jenkins_slave docker.io/ubuntu:16.04
root@8d8eeb87e586:/#


이제 이 아무것도 없는 도커 컨테이너를 젠킨스 Slave Node로 활용하기 위해 무엇을 해야할까 한번 정리 해보자.


1. 패키지 업데이트

2. 컨테이너 접근을 위한 ssh 설치

3. 빌드를 위한 maven 설치

4. 소스변경사항을 가져오기 위한 git 설치

5. jdk11이 적용된 프로젝트의 빌드를 위해 openjdk11 설치

6. root 계정을 사용하기 위해 root 패스워드 설정

7. ssh로 root접근이 가능하도록 sshd_config파일의 옵션 수정

8. Jenkins 전역설정의 JAVA_HOME과 MAVEN_HOME의 경로 동기화


이정도가 되겠다. 먼저 패키지 업데이트부터 차근차근 진행 해보자.



1. 패키지 업데이트


root@8d8eeb87e586:/# apt-get update Get:1 http://archive.ubuntu.com/ubuntu xenial InRelease [247 kB] Get:2 http://security.ubuntu.com/ubuntu xenial-security InRelease [109 kB] Get:3 http://archive.ubuntu.com/ubuntu xenial-updates InRelease [109 kB] ... ... Fetched 15.7 MB in 1min 3s (248 kB/s) Reading package lists... Done



2. SSH 설치


root@8d8eeb87e586:/# apt-get install -y openssh-server Reading package lists... Done Building dependency tree Reading state information... Done ... ... Setting up ssh-import-id (5.5-0ubuntu1) ... Processing triggers for libc-bin (2.23-0ubuntu11) ... Processing triggers for ca-certificates (20170717~16.04.2) ... Updating certificates in /etc/ssl/certs... 148 added, 0 removed; done. Running hooks in /etc/ca-certificates/update.d... done. Processing triggers for systemd (229-4ubuntu21.16) ...



3. Maven 설치


root@8d8eeb87e586:/# apt-get install -y maven Reading package lists... Done Building dependency tree Reading state information... Done ... ... After this operation, 156 MB of additional disk space will be used. Get:1 http://archive.ubuntu.com/ubuntu xenial-updates/main amd64 libjpeg-turbo8 amd64 1.4.2-0ubuntu3.1 [111 kB] Get:2 http://archive.ubuntu.com/ubuntu xenial-updates/main amd64 x11-common all 1:7.7+13ubuntu3.1 [22.9 kB] Get:3 http://archive.ubuntu.com/ubuntu xenial/main amd64 libxtst6 amd64 2:1.2.2-1 [14.1 kB] ... ... done.



4. Git 설치


root@8d8eeb87e586:/# apt-get install -y git-core Reading package lists... Done Building dependency tree Reading state information... Done Get:1 http://archive.ubuntu.com/ubuntu xenial/main amd64 libatm1 amd64 1:2.5.1-1.5 [24.2 kB] Get:2 http://archive.ubuntu.com/ubuntu xenial/main amd64 libmnl0 amd64 1.0.3-5 [12.0 kB] Get:3 http://archive.ubuntu.com/ubuntu xenial/main amd64 libpopt0 amd64 1.16-10 [26.0 kB] ... ... update-alternatives: using /usr/bin/file-rename to provide /usr/bin/rename (rename) in auto mode Processing triggers for libc-bin (2.23-0ubuntu11) ... Processing triggers for systemd (229-4ubuntu21.16) ...



5. OpenJdk 11 설치 (필자는 11.0.1 버전을 사용하였다.)


root@8d8eeb87e586:/# wget -P /usr/local https://download.java.net/java/GA/jdk11/13/GPL/openjdk-11.0.1_linux-x64_bin.tar.gz --2019-03-22 01:15:09-- https://download.java.net/java/GA/jdk11/13/GPL/openjdk-11.0.1_linux-x64_bin.tar.gz Resolving download.java.net (download.java.net)... 23.212.14.196 Connecting to download.java.net (download.java.net)|23.212.14.196|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 187599951 (179M) [application/x-gzip] Saving to: '/usr/local/openjdk-11.0.1_linux-x64_bin.tar.gz' openjdk-11.0.1_linux-x64_bin.tar.gz 100%[===================================================================================================================>] 178.91M 22.2MB/s in 8.7s 2019-03-22 01:15:23 (20.5 MB/s) - '/usr/local/openjdk-11.0.1_linux-x64_bin.tar.gz' saved [187599951/187599951] root@8d8eeb87e586:/# cd /usr/local/ && tar -xzf openjdk-11.0.1_linux-x64_bin.tar.gz && rm openjdk-11.0.1_linux-x64_bin.tar.gz root@8d8eeb87e586:/usr/local# ls bin etc games include jdk-11.0.1 lib man sbin share src root@8d8eeb87e586:/usr/local#



6. root 패스워드 설정


root@8d8eeb87e586:/usr/local# passwd root Enter new UNIX password: Retype new UNIX password: passwd: password updated successfully root@8d8eeb87e586:/usr/local#



7. sshd_config 설정파일 수정 (ssh로 root계정에 접근하기 위한 설정 변경)


root@8d8eeb87e586:/usr/local# vi /etc/ssh/sshd_config bash: vi: command not found



당황하지 말고 vi에디터를 설치 해주도록 한다.


7-1. vi에디터 설치


root@8d8eeb87e586:/usr/local# apt-get install vim Reading package lists... Done Building dependency tree Reading state information... Done ... ... Processing triggers for libc-bin (2.23-0ubuntu11) ...



7-2. sshd_config 파일 수정 (PermitRootLogin의 prohibit-password설정을 yes로 바꾼 후 저장한다.)


root@8d8eeb87e586:~# vi /etc/ssh/sshd_config

... ...

# Authentication: LoginGraceTime 120 PermitRootLogin prohibit-passwordyes ... ... :wq



8. Jenkins 전역 설정에 맞추어 JAVA_HOME과 MAVEN_HOME 경로 설정


필자는 JDK는 /usr/local/jdk-11.0.1에 저장 하였고(다운로드 및 압축해제는 위의 설정과 같으므로 JAVA_HOME은 별 다른 설정이 필요하지 않다.) maven은 /usr/share/maven 폴더에 저장이 되어있는데 /usr/local/apache-maven-3.5.3의 경로로 심볼릭링크를 걸어두었다.


root@8d8eeb87e586:~# ln -s /usr/share/maven /usr/local/apache-maven-3.5.3



이제 모든 준비가 완료 되었다. Jenkins 서버에서 현재 설정해둔 도커 컨테이너로의 접근이 가능한지 확인 해보자. 먼저 Jenkins 서버에서 필자의 컨테이너에 접근하기 위해서는 외부에서 접근이 가능하도록 포트를 열어 주어야 한다. 필자는 윈도우 10을 기준으로 설명하겠다. 먼저 방화벽의 인바운드 설정에서 아까 도커 컨테이너 생성 시 포트포워딩 했었던 8890을 지정해준다.




인바운드설정이 완료되면 8890으로 들어온 포트를 내부의 어떤 IP로 포워딩 할지 지정 해주어야 하는데 윈도우 커멘드를 관리자 권한으로 접근하여 포트 포워딩을 설정 해주도록 한다.


C:\WINDOWS\system32>netsh interface portproxy add v4tov4 listenport=8890 listenaddress=192.168.70.22 connectport=8890 connectaddress=172.17.100.97 C:\WINDOWS\system32>netsh interface portproxy show v4tov4 ipv4 수신 대기: ipv4에 연결: 주소 포트 주소 포트 --------------- ---------- --------------- ---------- 192.168.70.22 8890 172.17.100.97 8890



192.168.70.22:8890으로 들어온 포트를 필자의 컴퓨터 내부 IP인 172.17.100.97:8890으로 보낸다는 의미이다. Jenkins 서버에서 필자의 도커 컨테이너까지의 접근은 아래 이미지를 참고하기 바란다.




이제 젠킨스 Slave Node를 실행하여 도커 컨테이너로 접근해보자. ssh로 접근하기 위해 ssh service를 활성화 시켜준다.


root@8d8eeb87e586:~# service ssh start * Starting OpenBSD Secure Shell server sshd


ssh service 활성화가 완료되면 젠킨스에서 아까 생성한 m7Docker에 진입 후 Launch agent버튼을 눌러 Slave Node를 실행시킨다.



아래와 같은 로그가 올라온다면 ssh접속에 성공한것이다.


[03/22/19 12:21:07] [SSH] Opening SSH connection to 192.168.70.22:8890.

... ...

[03/22/19 12:21:07] [SSH] Starting sftp client. [03/22/19 12:21:07] [SSH] Remote file system root /jenkins does not exist. Will try to create it... [03/22/19 12:21:07] [SSH] Copying latest slave.jar... [03/22/19 12:21:07] [SSH] Copied 771,004 bytes. Expanded the channel window size to 4MB [03/22/19 12:21:07] [SSH] Starting slave process: cd "/jenkins" && java -jar slave.jar <===[JENKINS REMOTING CAPACITY]===>channel started Remoting version: 3.21 This is a Unix agent Evacuated stdout Agent successfully connected and online



이제 젠킨스 빌드 시 이 Slave Node를 활용하겠다는 설정을 해주어야 하는데 프로젝트 진입 → 구성 → Restrict where this project can be run을 체크하고 위에서 생성했던 m7Docker 를 입력하여 저장해준다.



이제 빌드를 진행 해보자.



m7Docker Slave Node에서 빌드가 정상적으로 진행되었다.



지금까지의 과정들은 도커파일을 사용하면 현재 도커 컨테이너와 동일한 설정이 담긴 이미지를 한번에 생성 할 수 있다. 도커파일의 생성방법은 지난 시간에 설명하였으므로 이번 시간에는 도커파일의 스크립트 설명과 직접 도커파일을 빌드하여 이미지를 생성하고 컨테이너에 SSH접근까지만 확인하도록 하겠다.

# Ubuntu 16.04버전을 기반으로 함
FROM ubuntu:16.04

# 다운로드 링크, 파일명, maven 버전 등 변경이 필요한 경우를 대비해 변수로 선언해준다.
ARG jdkFileName=openjdk-11.0.1_linux-x64_bin.tar.gz
ARG jdkDownloadUrl=https://download.java.net/java/GA/jdk11/13/GPL/${jdkFileName}
ARG mavenHome=/usr/local/apache-maven-3.5.3
ARG userName=root
ARG userPassword=xptmxm123

# 패키지 목록 최신화 후 원격 접속을 위한 ssh와 빌드를 위한 maven 및 변경내역을 가져올 git을 설치한다.
RUN apt-get update
RUN apt-get install -y openssh-server
RUN apt-get install -y maven
RUN apt-get install -y git-core

# 현재 jenkins 전역설정에서 maven의 경로는 /usr/local/apache-maven-3.5.3 으로 지정 해두었다.
# ln -s 명령을 사용하여 심볼릭 링크를 걸어준다.
RUN ln -s /usr/share/maven ${mavenHome}

# service 및 api의 빌드는 jdk11이 필요하므로 다운로드 받고 압축을 풀어준다.
RUN wget -P /usr/local ${jdkDownloadUrl}
RUN cd /usr/local/ && tar -xzf ${jdkFileName} && rm ${jdkFileName}

# root 계정을 사용할 것이므로 root계정의 비밀번호 설정 및 root계정으로 ssh접근이 가능하도록 수정한다.
RUN echo "${userName}:${userPassword}" | chpasswd
RUN sed -i 's/PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config

# 컨테이너 시작 시 ssh서비스를 Active 시키기 위해 ENTRYPOINT를 지정 해둔다
ENTRYPOINT service ssh restart && /bin/bash

# 외부에서 접근이 가능한 포트를 지정해준다. ssh접속이므로 22번 포트 설정
EXPOSE 22


이제 해당 도커파일을 기반으로 이미지를 생성해보자. 도커파일을 빌드하고 이미지가 잘 생성되었는지 확인해보자.


[root@localhost test2]# docker build -t docker_img ./ ... ... ---> aba77358024b Removing intermediate container 490098d7f585 Step 16/17 : ENTRYPOINT service ssh restart && /bin/bash ---> Running in 22cd10713ee8 ---> 98e07e4fc4b5 Removing intermediate container 22cd10713ee8 Step 17/17 : EXPOSE 22 ---> Running in bcf569c735cc ---> 07f9b213b807 Removing intermediate container bcf569c735cc Successfully built 07f9b213b807 [root@localhost test2]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE docker_img latest 07f9b213b807 3 minutes ago 939 MB docker.io/ubuntu 16.04 9361ce633ff1 10 days ago 118 MB apache_img first d307433464d8 2 weeks ago 390 MB layer_test second 127ab2a39585 4 weeks ago 188 MB layer_test first 48f47ef8257d 4 weeks ago 188 MB docker.io/ubuntu 14.04 5dbc3f318ea5 8 weeks ago 188 MB


새로만든 이미지로 docker run 명령을 실행하기 전에 이전에 켜두었던 jenkins_slave 컨테이너는 종료 하도록 한다. (포트가 겹치므로)


[root@localhost test2]# docker ps -a CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 8d8eeb87e586 docker.io/ubuntu:16.04 "/bin/bash" 19 hours ago Up 4 hours 0.0.0.0:8890->22/tcp jenkins_slave dcc887cf8b0b apache_img:first "apachectl -D FORE..." 2 weeks ago Exited (137) 2 days ago apache_test d33a60d75c45 layer_test:first "/bin/bash" 4 weeks ago Exited (255) 2 weeks ago layer_test2 c73025587f2b ubuntu:14.04 "/bin/bash" 4 weeks ago Exited (0) 4 weeks ago layer_test [root@localhost test2]# docker stop jenkins_slave jenkins_slave [root@localhost test2]#


이제 새로운 이미지로 컨테이너를 생성하도록 한다.
실행이 잘 되었다.


[root@localhost test2]# docker run -i -t -p 8890:22 --name jenkins_slave2 docker_img:latest * Restarting OpenBSD Secure Shell server sshd [ OK ] root@72f83424f1dc:/#


이제 젠킨스에 다시 들어가서 Launch Agent 버튼을 눌러 SSH 접근이 가능한지 확인해보자.

아래 이미지가 출력되면 SSH접속에 성공한것이다.



여기까지 젠킨스 도커 컨테이너를 활용한 Slave Node의 생성과 빌드를 해보고 이 과정을 도커파일로 만들어 이미지를 생성하고 컨테이너 화 하는 실습까지 진행 해보았다. 이미지를 그대로 내려받았을때 세부설정에 대한 변경이 필요한 경우가 있을것이다. 이 때마다 컨테이너를 만들고 설정을 바꿔 다시 commit명령으로 새로운 이미지를 만드는 과정이 생길 수 있는데 도커파일을 활용하면 이러한 낭비를 없앨 수 있다. 또한 도커파일은 설정만 바꾸어 여러대의 서버에 적용하여 사용하는 경우에도 효과적으로 사용할 수 있다.




Docker : Dockerfile 실습 편

끝.



'Docker' 카테고리의 다른 글

Docker : 도커스웜 클러스터 구축 편  (0) 2019.05.17
Docker : 컨테이너 오케스트레이션 개요 편  (0) 2019.04.19
Docker : Dockerfile 실습 편  (0) 2019.03.22
Docker : Dockerfile 편  (0) 2019.03.07
Docker : 이미지 편  (0) 2019.02.22
Docker : 컨테이너 편  (1) 2019.02.15
Posted by DevStream

댓글을 달아 주세요

아마... 아래 그림은 한번쯤은 본 적이 있을 겁니다.
개개인의 서로 다른 재능을 무시한채, 획일적인 기준으로 평가하는 문제를 풍자하는 그림인데요..


"공정한 선발을 위해 같은 시험을 봐야 합니다: 모두 저 나무에 올라 가세요" (말풍선)

"모두가 천재다. 그러나 나무에 오르는 능력으로 물고기를 판단한다면, 그 물고기는 평생 자신을 멍청하다고 생각하며 살아갈 것이다" - 아인슈타인



이 카툰은 획일적인 교육과 평가에만 의존하는 교육분야의 문제 인식으로 자주 인용되지만, 기업의 경영 전략에도 대입해 볼 수 있다는 생각이 듭니다.


기업의 존재 목적은 수익 창출입니다. 모든 기업은 수익을 창출하기 위해 다른 기업과는 차별화된 재화나 서비스를 시장에 공급합니다. 이러한 수익 창출 과정에서 주어진 자원을 최대한 효율적으로 사용해서 수익을 극대화 시키고 리스크(risk)를 경감하는 것이 중요합니다.


따라서 기업은 자신이 제공하는 핵심 서비스 전개를 위한 업무 역량은 자체적으로 내재화 시켜서 경쟁력을 확보하고 다른 기업과의 차별화를 꾀해야 합니다. 


반면, 핵심 서비스에 직접적이지 않은 비 핵심업무는 다른 기업이나 다른 사람에게 위임하여 그들의 전문성을 활용하는 것이 좋습니다. 그렇게 해야 생산성이 향상하고 리스크를 경감할 수 있습니다. 


기업의 외주(아웃소싱) 전략에 좋은 참조 모델이 있어 소개하고자 합니다.


---


'캐즘(Chasm) 이론'으로 유명한 제프리 A.무어의 '코어/컨텍스트(Core/Context) 모델'은 기업이 주어진 자원을 어떻게 운용하는 것이 좋을지에 대한 참조 모델을 제시하며 '클라우드 충격'이라는 책에서 자세히 설명하고 있습니다.


코어(Core)란 의미 그대로 기업의 핵심 업무를 말합니다. 코어 업무는 기업의 경쟁우위를 높여주는 중요한 것이므로 최대한 자원을 집중해야 하고 자체 개발을 해야 한다는 것입니다.


반면 컨텍스트(Context)는 코어 이외의 모든 활동이라고 말합니다. 컨텍스트 업무는 차별성을 추구하는 것이 아니라 가능한 표준적인 방식으로 효율을 최우선으로 수행하는 것'이 좋다고 말합니다.


또한 이러한 컨텍스트 업무는 어느 누군가에게는 코어 업무이기 때문에 그쪽에 아웃소싱을 맡길 수 있다고 시사합니다.

게임을 개발하는 기업에서 인사관리 프로그램이 필요한 사례를 예로 들어 보겠습니다.


게임 개발은 회사의 중요한 코어 업무이기 때문에 자체 개발을 해야 하며, 인사관리 프로그램은 컨텍스트 업무이기에 그 업무를 코어로 하는 외부 업체에게 아웃소싱 하는 식입니다.


그러나 코어/컨텍스트만으로는 기업내 모든 활동을 양분하기에는 한계가 있어 보입니다.


무어는 코어/컨텍스트 구분과 더불어, '미션크리티컬/'비미션크리티' 이라는 또 하나의 축을 또 다른 기준으로 내세웠습니다. 미션 크리티컬은 만일 중지되었을 때 즉시 심각한 리스크로 이어지는 기업 활동을 말하며 그 외의 모든 업무는 비 미션크리티컬이 됩니다. 즉 리스크의 정도를 기준으로 삼고 있습니다

.

이에 대한 도식은 아래와 같습니다.


[출처: 클라우드의 충격]


---


제가 속해 있는 IT 분야에서도 아웃소싱 전략에 큰 영향을 주는 두가조 요소가 있습니다.


바로 클라우드(Cloud)와 오픈소스(Opensource)입니다.


오픈소스의 경우 외주(아웃소싱)라는 카테고리와는 성격이 조금 다를 수 있지만, 특정한 업무를 외부 자원으로 해결한다는 위임이라는 측면에서는 맥을 같이 하기에 연결지어 봤습니다.


클라우드의 경우 종전의 자체 전산실에 직접 렉과 장비를 구비하고 운영했던 온프레미스(On-Premise) 환경과는 달리 클라우드 환경에서는 이러한 물리적인 장비와 운영체제, 소프트웨어까지 빌려 쓸 수 있게 되었습니다. 또한 기업은 자신들이 어디 까지를 직접 운영할지 결정하여 Iaas, Paas, Saas 형태의 클라우드 서비스 모델을 선택할 수 있습니다. 


갈수록 IT 환경의 판이 많이 달라지고 있습니다. 자원을 최적화 시키는 것은 기업의 생산성을 향상시키는 것 뿐만 아니라, 서로 다른 재능을 가진 기업간 위임이 활성화 되어 산업의 발전에도 기여하게 된다고 봅니다. 과거 분업화가 많은 발전을 야기한 것 처럼요.


현재 자신의 회사나 수행하는 프로젝트에서, 코어와 컨텍스트, 미션크리티컬과 비미션크리티컬의 4사분면에 해당하는 업무가 무엇인지 파악하여 전략을 재정비 해 보는 것도 좋을 것입니다.

'프로젝트 관리' 카테고리의 다른 글

아키텍처의 출발점  (0) 2019.04.04
Core/Context 모델  (0) 2019.03.22
Tuckman의 팀 발달 모델  (0) 2019.03.08
Posted by 사용자 박종명

댓글을 달아 주세요

Data Cleaning and Text Preprocessing

기계가 텍스트를 이해할 수 있도록 텍스트를 정제하고 신호와 소음을 구분하여 아웃라이어 데이터로 인한 오버피팅을 방지하기 위해서는 다음과 같은 처리를 해주어야 한다.

  • HTML 태그, 특수문자, 이모티콘 처리
  • 토근화(Tokenization) : 문장의 단어를 분리하는 단계
  • 불용어(Stopword) 제거 : 자주 등장하지만 특별한 의미를 갖지 않는 단어 제거
  • 어간 추출(Stemming) 및 음소표기법(Lemmatization)
  • 정규 표현식

텍스트 데이터 전처리 이해

정규화 normalization (입니닼ㅋㅋ -> 입니다 ㅋㅋ, 샤릉해 -> 사랑해)
한국어를 처리하는 예시입니닼ㅋㅋㅋㅋㅋ -> 한국어를 처리하는 예시입니다 ㅋㅋ

토큰화 tokenization
한국어를 처리하는 예시입니다 ㅋㅋ -> 한국어Noun, 를Josa, 처리Noun, 하는Verb, 예시Noun, 입Adjective, 니다Eomi ㅋㅋKoreanParticle

어근화 stemming (입니다 -> 이다)
한국어를 처리하는 예시입니다 ㅋㅋ -> 한국어Noun, 를Josa, 처리Noun, 하다Verb, 예시Noun, 이다Adjective, ㅋㅋKoreanParticle

어근 추출 phrase extraction
한국어를 처리하는 예시입니다 ㅋㅋ -> 한국어, 처리, 예시, 처리하는 예시

(출처 : 트위터 한국어 형태소 분석기)

BeautifulSoup

많이 쓰이는 파이썬용 파서로 html, xml을 파싱할때 주로 많이 사용한다.

BeautifulSoup에는 기본적으로 파이썬 표준 라이브러리인 html파서를 지원하지만, lxml을 사용하면 성능 향상이 있다.

BeautifulSoup(markup, '[파서명]')

  • html.parser : 빠르지만 유연하지 않기 때문에 단순한 html문서에 사용
  • lxml : 매우 빠르고 유연
  • xml : xml 파일에 사용
  • html5lib : 복접한 구조의 html에 대해서 사용.(속도가 느린편)
from bs4 import BeautifulSoup
import re

data = "<p>초등학교 입학을 축하합니다.~~~^^;<br/></p>"
soup = BeautifulSoup(data, "html5lib")
remove_tag = soup.get_text()
result_text = re.sub('[-=+,#/\?:^$.@*\"※~&%ㆍ!』\\‘|\(\)\[\]\<\>`\'…》;]', ''
              , remove_tag)
print(result_text)

결과 : 초등학교 입학을 축하합니다.

<p>초등학교 입학을 축하합니다.~~~^^;<br/></p> 문장에서 태그와 특수문자를 제거하기 위해 BeautifulSoup 와 정규표현식을 사용하였다.

토큰화 (Tokenization)

코퍼스(corpus)에서 토큰(token)이라 불리는 단위로 나누는 작업을 토큰화(Tokenization)라고 부른다. 토큰의 단위가 상황에 따라 다르지만, 보통 의미있는 단위로 토큰 정의한다.

토큰의 기준을 단어로 하는 경우 가장 간단한 방법은 띄어쓰기를 기준으로 자르는 것이다.

I loved you. machine learning 에서 구두점을 제외시키고 토큰화 한 결과는 다음과 같다.

"I", "loved", "you", "machine", "learning"

하지만 보통 토큰화 작업은 단순히 구두점이나 특수문자를 전부 제거하는 작업을 수행하는 것만으로 해결되지 않는다. 구두점이나 특수문자를 전부 제거하면 토큰이 의미를 잃어 버리는 경우가 발생하기도 하기때문이다. 띄어쓰기 단위로 자르면 사실상 단어 토큰이 구분되는 영어와 달리, 한국어는 띄어쓰기반으로는 단어 토큰을 구분하기 어렵다.

한국어는 단어의 다양한 의미와 높낮이 그리고 띄어쓰기가 어렵다 보니 잘못된 데이터를 많이 받게 되어 자연어처리를 하는데 있어 어려움이 많이 따른다. 하지만 다양한 곳에서 한국어 처리를 위한 형태소 분석기를 연구하고 있다. 얼마전 카카오에서도 카이라는 딥러닝 기술 기반의 형태소 분석기를 오픈소스로 공개 하였고 그외에 트위터, 코엔엘파이등 꽤 쓸만한 것들이 있다.

불용어 (Stopword)

일반적으로 코퍼스에서 자주 나타나는 단어로 학습이나 예측 프로세스에 실제로 기여를 하지 않는다.

(조사, 접미사 - 나,나,은,는,이,가,하다,합니다….등)

NLTK에는 17개의 언어에 대해 불용어가 정의되어 있다. 하지만 아쉽게도 한국어는…없다.

간단하게 10개 정도만 영어의 불용어를 보면,

import nltk
from nltk.corpus import stopwords

nltk.download('stopwords')
stopwords.words('english')[:10]

결과 : [‘i’, ‘me’, ‘my’, ‘myself’, ‘we’, ‘our’, ‘ours’, ‘ourselves’, ‘you’, ‘your’]

간단하다, 불용어 사전에 있는 단어들은 제거하면 된다.

한국어의 경우 불용어를 제거하는 방법으로는 위에서 언급한 형태소 분석 후 조사, 접속사 등을 제거할 수 있다.

어간 추출 (Stemming)

스태밍이라고 하는데, 어간이란 말은 단어의 의미를 담고 있는 단어의 핵심부분이라 생각하면 된다. 쉽게, 단어를 축약형으로 바꿔주는 작업이라고도 할 수 있다.

한국어가, 한국어는, 한국어처럼 -> 한국어

대표적으로 포터 스태머(PorterStemmer)와 랭커스터 스태머(LancasterStemmer)가 있는데 포터는 보수적이고 랭커스터는 좀 더 적극적이다.

PorterStemmer

from nltk.stem.lancaster import LancasterStemmer
stemmer = LancasterStemmer()
print(stemmer.stem('maximum'))
print("running >> {}".format(stemmer.stem("running")))
print("runs >> {}".format(stemmer.stem("runs")))
print("run >> {}".format(stemmer.stem("run")))

maxim
running » run
runs » run
run » run

LancasterStemmer

from nltk.stem.lancaster import LancasterStemmer
lancaster_stemmer = LancasterStemmer()
print(lancaster_stemmer.stem('maximum'))
print("running >> {}".format(lancaster_stemmer.stem("running")))
print("runs >> {}".format(lancaster_stemmer.stem("runs")))
print("run >> {}".format(lancaster_stemmer.stem("run")))

maxim
running » run
runs » run
run » run

음소표기법 (Lemmatization)

언어학에서 음소표기법 (Lemmatization)은 단어의 보조 정리 또는 사전 형식에 의해 식별되는 단일 항목으로 분석 될 수 있도록 굴절 된 형태의 단어를 그룹화하는 과정이다. 어간 추출(Stemming)과는 달리 단어의 형태가 적절히 보존되는 양상을 보이는 특징이 있다. 하지만 그럼에도 의미를 알 수 없는 적절하지 못한 단어로 변환 하기도 하는데 음소표기법(Lemmatizer)은 본래 단어의 품사 정보를 알아야만 정확한 결과를 얻을 수 있기 때문이다.

  • 품사정보가 보존된 형태의 기본형으로 변환.
  • 단어가 명사로 쓰였는지 동사로 쓰였는지에 따라 적합한 의미를 갖도록 추출하는 것.
from nltk.stem import WordNetLemmatizer
n=WordNetLemmatizer()
words=['have', 'going', 'love', 'lives', 'fly', 'dies', 'has', 'starting']
[n.lemmatize(w) for w in words]

결과 : [‘have’, ‘going’, ‘love’, ‘life’, ‘fly’, ‘dy’, ‘ha’, ‘starting’]

결과에서 보면 알수 있듯이 dy나 ha는 그 의미를 알수 없는 적절하지 못한 단어로 변환이 되었다.

하지만 dies나 has가 동사로 쓰였다는 것을 알려준다면 좀더 정확한 결과를 얻을 수 있게 된다.

n.lemmatize('dies', 'v')

‘die’

n.lemmatize('has', 'v')

‘have’

음소표기법은 문맥을 고려하며, 수행했을 때의 결과는 해당 단어의 품사 정보를 보존한다. 하지만 어간 추출은 품사 정보가 보존이 되지 않는다.

마치며..

이런 작업들을 하는 이유는 눈으로 봤을 때는 서로 다른 단어들이지만, 하나의 단어로 일반화시킬 수 있다면 하나의 단어로 일반화시켜서 문서 내의 단어 수를 줄여보자는 것이다.

자연어 처리에서 전처리의 지향점은 언제나 갖고 있는 코퍼스로부터 복잡성을 줄이는 일이다!!!

'Article' 카테고리의 다른 글

Google AMP 개요 편  (0) 2019.05.30
Docker : Docker Compose 편  (0) 2019.04.05
자연어처리 - 데이터 정제  (0) 2019.03.22
REST API 디자인 가이드 적용기  (0) 2019.03.08
자연어처리 - Bag of words, n-gram  (0) 2019.03.08
The Scale Cube (규모 확장성 모델)  (0) 2019.02.22
Posted by @위너스

댓글을 달아 주세요

카프카(Kafka)의 이해

카프카(Kafka) 설치 및 클러스터 구성

[카프카(Kafka) 어플리케이션 제작 ] #1. 프로듀서

[카프카(Kafka) 어플리케이션 제작 ] #2. 컨슈머


이전 글에서는 프로듀서 내부 동작 확인 및 어플리케이션을 제작하였다. 

이번에는 컨슈머 어플리케이션을 제작해본다. 

컨슈머 

카프카 컨슈머 내부 동작 및 컨슈머 어플리케이션에서 메시지 소비하는 과정을 알아보자.

컨슈머 내부 동작

컨슈머의 전체적인 내부 동작을 이해하면 컨슈머 어플리케이션을 디버깅할 때 도움이 많이 되며, 올바른 결정을 하도록 도와준다.

카프카 컨슈머의 역활

 토픽 구독

  • 컨슈머 동작의 시작은 토픽의 구독임 

 오프셋 위치

  • 카프카는 다른 큐와는 다르게 메시지 오프셋을 저장 안함
  • 오프셋은 각자의 컨슈머들이 유지해야함(컨슈머 API를 사용)

 재연/되감기/메시지 스킵

  • 상황에 따라 커스텀하게 오프셋을 변경할 수 있음
  • 재연/되감기/메시지 스킵 가능
 하트비트
  • 컨슈머의 지정된 파티션의 멤버쉽과 소유권을 확인하기 위해 하트비트를 이용하여 주기적으로  체크
  • 하트비트를 수신하지 못하면 컨슈머 그룹의 컨슈머 재할당(리벨런싱)이 일어난다. 

 오프셋 커밋

  •  리벨런싱이 일어날 때 이미 읽은 오프셋을 다시 읽을 수 있으므로 오프셋 커밋을 함

 역직렬화

  •  프로듀서에서는 카프카에 메시지를 보낼 때 어떤 직렬화를 했는지 명시하고 컨슈머에서는 역직렬화를 명시함 (프로듀서 글의  key.serializer 참고)


메시지 수신(Subscribe)을 위한 동작 순서

1. 토픽 구독 ↓
2. 카프카 서버 조사(폴 루프) : 서버 조정, 파티션 리벨런스, 컨슈머 하트피트 체크 등↓
3. 새로운 레코드가 있는 지 체크하고 있으면 가져옴↓
4. 역직렬화 ↓
5. 데이터 유효성 검증 ↓
6. 다른 시스템에 이관 및 저장 

컨슈머 어플리케이션 

프로듀서 API와 마찬가지로 컨슈머도 풍부한 API 세트를 제공한다. 

컨슈머 기본 설정 및 코드

 bootstrap.servers

  • 브로커 주소 목록
  • hostname:port 형식
  • 프로듀서 설정과 동일

 key.deserializer

  • 프로듀서에서 key.serializer와 동일한 클래스로 deserializer 한다.
  • 다른 클래스로 deserializer 하면 예외 발생
  • ByteArraySerializer, StringSerializer, IntegerSerializer 클래스 선택

 value.deserializer

  • key.deserializer와 비슷하며 메시지 값 정보에 대한 역직렬화를 명시

 group.id

  • 컨슈머 그룹 정의
  • 이전 버전에서는 필수가 아니었지만 최근 버전에서는 필수로 변경 사용하지 않으면 아래와 같은 에러 발생(Attempt to join group failed due to fatal error: The configured groupId is invalid)
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers","192.168.100.173:9092");
        producerProps.put("group.id", "hirang_test02");
        producerProps.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        producerProps.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        return producerProps;
        KafkaConsumer consumer = new KafkaConsumer(setConsumerProperty());

구독

컨슈머는 메시지 데이터를 받기 위해 토픽을 구독한다. (KafkaConsumer.subscribe 사용)

구독 메소드는 입력 파라미터에 따라 다양한 구독이 가능하다. 

subscribe(Collection<String> topics)

: 원하는 토픽 목록을 전달 / 기본 리밸런서 사용

subscribe(Pattern pattern, ConsumerRebalanceListener listener)

: 원하는 토픽을 찾기 위해 정규식을 전달 / 정규식에 대응하는 새로운 토픽의 추가가 삭제 발생시 리밸런서 트리거됨

subscribe(Collection<String> topics, ConsumerRebalanceListener listener)

: 토픽의 목록을 전달 / 리밸런서 트리거됨


오프셋 커밋 처리

오프셋에 대한 커밋이 일어나는 방법은 3가지가 존재하며 각각 특성이 존재한다. 

 자동커밋

  •  컨슈머의 기본 설정값 
  • enable.auto.commit=true 
  • auto.commit.interval.ms=1000 이 값을 길게 잡으면 장애 발생시 중복 읽기가 발생할 수 있음 
  • 예) 커밋 주기가 10초인데 7초가 지난 후 컨슈머 장애가 발생하면 이전 10초 전에 커밋한 오프셋을 가져오므로 중복 발생

 현재 오프셋 커밋

  • 상황에 따라 필요할 때 커밋 제어시 사용
  • enable.auto.commit=false
  • commitSync() 메서드를 사용해 커밋할 컨슈머 오프셋을 호출한다. 
  • ConsumerRecord의 인스턴스를 처리 후 사용 권장하며 그렇게 안하면 컨슈머 장애기 발생할 경우 레코드 손실 발생

 비동기 커밋

  •  동기 방식의 커밋은 Ack 수신이 없는 경우, 컨슈머는 대기 상태가 되므로 결과적으로 처리 속도가 좋지 못하다. 
  • 비동기도 메시지 중복은 발생할 수 있다. 
  • 예) 메시지 오프셋 10을 오프셋 5 이전에 커밋 했다면, 카프카는 5부터 10까지 다시 읽으므로 중복 발생


추가 설정

설정 작업 이전에 데이터를 처리하기 위해 컨슈머가 얼마의 시간이 필요한지 확인 및 테스트가 필요하다.

 enable.auto.commit

  • true/false
  • 오프셋 자동 커밋

 fetch.min.bytes

  • 데이터 읽기 요청에 대한 카프카 서버의 회신을 요구하는 데이터 바이트의 최소 크기 

 request.timeout.ms

  • 응답을 기다리는 최대 시간 

 auto.offset.reset

  • 유효한 오프셋이 없을 때  자동처리됨

 lastest : 파티션에서 가장 최근 메시지부터 시작

 earliest : 파티션 맨 처음부터 시작

 none : 예외가 발생됨

 session.timeout.ms

  • "컨슈머 동작 중이다"라고 알리기 위한 하트비트 전송 주기
  • 리밸런서 트리거 하는걸 막기 위함

 max.partition.fetch.bytes

  • 파티션마다 할당할 최대 데이터 크기
  • ConsumerRecord 객체에 대해 컨슈머가 필요로 하는 메모리는 파티션수 * 설정값보다 커야함
  • 예) 파티션 10개 1개의 컨슈머이고, max.partiton.fetch.bytes가 2MB면 10 * 2MB = 20MB가 메모리로 잡혀 있어야함


컨슈머 전체 코드
package com.example.kafkatest;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;

@SpringBootApplication
public class KafkaConsumerApplication {
    private static final Logger log = LoggerFactory.getLogger(KafkaTestApplication.class);
    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
        subscribe();
    }

    private static Properties setConsumerProperty(){
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers","192.168.100.173:9092");
        producerProps.put("group.id", "hirang_test02");
        producerProps.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        producerProps.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        producerProps.put("enable.auto.commit", "true");
        producerProps.put("auto.commit.interval.ms", "1000");
        producerProps.put("session.timeout.ms", "30000");
        return producerProps;
    }

    private static void subscribe(){
        String topic = "hirang";
        List topicList = new ArrayList<>();
        topicList.add(topic);
        KafkaConsumer consumer = new KafkaConsumer(setConsumerProperty());
        consumer.subscribe(topicList);
        log.info("Subscribed to topic :\t" + KafkaStatic.DEFAULT_TOPIC);
        int i = 0;
        try {
            while (true) {
                ConsumerRecords records = consumer.poll(500);
                for (ConsumerRecord record : records)
                    log.info("offset = " + record.offset() + "\tkey =" + record.key() + "\tvalue =" + record.value());

                //TODO : Do processing for data here
                consumer.commitAsync(new OffsetCommitCallback() {
                    public void onComplete(Map map, Exception e) {

                    }
                });
            }
        } catch (Exception ex) {
            //TODO : Log Exception Here
        } finally {
            try {
                consumer.commitSync();

            } finally {
                consumer.close();
            }
        }

    }
}

컨슈머 콘솔 로그

읽은 메시지 데이터를 로그로 확인하였다. 

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.3.RELEASE)

2019-03-08 11:30:52.369  INFO 4556 --- [           main] c.e.kafkatest.KafkaConsumerApplication   : Starting KafkaConsumerApplication on ts-hirang with PID 4556 (D:\workspace\github\kafka-test\target\classes started by DESKTOP in D:\workspace\github\kafka-test)
2019-03-08 11:30:52.379  INFO 4556 --- [           main] c.e.kafkatest.KafkaConsumerApplication   : No active profile set, falling back to default profiles: default
2019-03-08 11:30:53.178  INFO 4556 --- [           main] c.e.kafkatest.KafkaConsumerApplication   : Started KafkaConsumerApplication in 1.766 seconds (JVM running for 3.564)
2019-03-08 11:30:53.192  INFO 4556 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
	auto.commit.interval.ms = 1000
	auto.offset.reset = latest
	bootstrap.servers = [192.168.100.173:9092]
	check.crcs = true
	client.id = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = hirang_test02
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	session.timeout.ms = 30000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2019-03-08 11:30:53.238  INFO 4556 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2019-03-08 11:30:53.239  INFO 4556 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fa14705e51bd2ce5
2019-03-08 11:30:53.240  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : Subscribed to topic :	hirang
2019-03-08 11:30:53.318  INFO 4556 --- [           main] org.apache.kafka.clients.Metadata        : Cluster ID: 6ejl7iMZQASnPqRCdv13wA
2019-03-08 11:30:53.319  INFO 4556 --- [           main] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=hirang_test02] Discovered group coordinator 192.168.100.173:9092 (id: 2147483646 rack: null)
2019-03-08 11:30:53.321  INFO 4556 --- [           main] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=hirang_test02] Revoking previously assigned partitions []
2019-03-08 11:30:53.321  INFO 4556 --- [           main] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=hirang_test02] (Re-)joining group
2019-03-08 11:30:53.347  INFO 4556 --- [           main] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=hirang_test02] Successfully joined group with generation 12
2019-03-08 11:30:53.348  INFO 4556 --- [           main] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=hirang_test02] Setting newly assigned partitions [hirang-0]
2019-03-08 11:31:03.104  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277494	key =null	value =0-----87d9bcc5-3b75-4032-9427-e02b75a9eed7
2019-03-08 11:31:03.107  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277495	key =null	value =1-----9c6e8e14-dd8e-4ddf-b810-4cf1f3c80e5c
2019-03-08 11:31:03.108  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277496	key =null	value =2-----5f8eac28-06ed-4c53-a0c0-a15721a6ede2
2019-03-08 11:31:03.108  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277497	key =null	value =3-----e6dd3658-5bc2-4bdb-ab7b-b127f8ca176a
2019-03-08 11:31:03.108  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277498	key =null	value =4-----e077a534-d95e-4aca-adb3-1c2c1a29fce1
2019-03-08 11:31:03.108  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277499	key =null	value =5-----5b38908b-ec60-43e8-98cc-76fa66451483
2019-03-08 11:31:03.112  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277500	key =null	value =6-----d049b157-9370-433e-9cee-7101918eb449
2019-03-08 11:31:03.112  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277501	key =null	value =7-----8786e6a2-c757-4a3f-be59-130d5c393bfc
2019-03-08 11:31:03.116  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277502	key =null	value =8-----30b82719-abc5-4e1e-81d1-d8e47e53cb7d
2019-03-08 11:31:03.122  INFO 4556 --- [           main] c.e.kafkatest.KafkaTestApplication       : offset = 277503	key =null	value =9-----687f0bdb-3280-415f-9fb8-8d5734380755

컨슈머 모범 사례

카프카 컨슈머의 내부 동작과 토픽의 메시지 소비(Subscribe)하는 방법을 알아보았다. 
컨슈머 구성 요소를 정의할 때 좀 더 바람직한 가이드라인에 대해서 알아보자.

예외처리를 하자!
모든 어플리케이션은 예외처리... 

리벨런스 관리!
새로운 컨슈머가 컨슈머 그룹에 합류할 때마다, 혹은 예전 컨슈머가 종료되면 파티션 리밸런스가 트리거 된다. 
(컨슈머 그룹의 컨슈머 개수와 파티션이 같을 때 새로운 컨슈머 추가는 리밸런스가 안일어남)
컨슈머가 파티션 소유권을 잃을 때마다 가장 최근에 수신한 오프셋을 반드시 커밋해 줘야 한다. 

제때 오프셋 커밋하기!
메시지 오프셋에 대한 커밋은 처리 상황에 따라 잘 제때 수행해야 한다. 
그렇지 않으면 중복 및 소실이 발생할 수 있다. 
오프셋 주기에 따른 성능과 중복에 대한 문제간의 트레이드 오프가 필요한다. 
예) 장애 발생시 데이터 중복 처리에 대한 심각한 문제가 발생할 경우 오프셋을 커밋하는 시간을 가능한 짧게 한다. 

자동 오프셋 커밋!
중복 처리에 대한 문제가 없거나, 컨슈머가 자동으로 오프셋을 커밋하도록 관리하기를 바랄경우 사용한다. 
일반적으로 대부분 사용

마치며

카프카의 프로듀서 및 컨슈머에 대한 내부 동작에 대해서 이해하였고 어플리케이션도 만들어 보았다. 

프로덕션 환경에 적용할 때 컨슈머 그룹안에서 오프셋 커밋 전략,하트비트, 리벨런스 등 여러 성능 테스트가 필요하다. 

Posted by 사용자 피랑이

댓글을 달아 주세요