[스파크(Spark)] #1. 개요

[스파크(Spark)] #2. 용어 및 개념

[스파크(Spark)] #3. 구조적 API 개요 및 기본 연산

 

이번에는 구조적 API의 개요 및 기본 연산에 대해서 알아본다. 

DataFrame와 Dataset은 둘 다 Row와 Column을 가지는 불변성을 가지는 분산 테이블 형태의 컬렉션이다. 

Dataset은 JVM 기반이므로 java와 scala를 지원하지만 Python은 지원하지 않는다. 

python 코드로 검증을 할 예정이므로 DataFrame 기준으로 설명한다. 

참고

Spark API 관련 자세한 부분은 Spark Docs를 참고하자.

DataFrame을 가공 관련 부분은 pyspark.sql 모듈을 사용한다. 

SparkSession 

Spark의 모든 기능에 대한 진입점은 SparkSession클래스를 사용해야 한다. 

from pyspark.sql import SparkSession 
spark = SparkSession \
	.builder \ .master("local") \
	.appName("Python Spark SQL basic example") \
	.config("spark.some.config.option", "some-value") \
	.getOrCreate()

- builder : 객체 생성

- master : 실행 환경을 설정

local 로컬 실행
local[4] 4코어로 로컬 실행
spark : // master : 7077 Spark 독립 실행형 클러스터

- config : 실행 옵션 설정, SparkConf 및 SparkSession 자체 구성에 자동으로 전파 

(SparkConf는 Spark의 런타임 구성 인터페이스며 이 인터페이스를 통해 사용자는 Spark SQL과 관련된 모든 Spark 및 Hadoop 구성을 가져오고 설정할 수 있음)

- getOrCreate : 기존 SparkSession을 가져 오거나 없는 경우 실더에 설정된 옵션을 기반으로 새로운 SparkSession을 생성

DataFrame 생성

SparkSession응용 프로그램이 기존RDD , 하이브 테이블 또는 Spark 데이터 소스 에서 DataFrames을 만들 수 있다.

df = spark.read.format("json")\
    .load("D:/2015-summary.json")
print(df)
# 결과
# DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

2015-summary.json
0.02MB

스키마 

DataFrame의 칼럼명과 데이터 타입을 정의한다. 

스키마는 데이터 소스에서 얻거나(schema-on-read) 직접 정의할 수 있다. 

# 1. 데이터 소스에서 얻는 방법
print(df.schema)
# 결과
# StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))


# 2. 직접 정의 방법
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
  StructField("DEST_COUNTRY_NAME", StringType(), True),
  StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
  StructField("count", LongType(), True)
])
print(myManualSchema)
# 결과
# StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,false)))

StructType의 자세한 정보는 링크 참조

컬럼과 표현식

컬럼

컬럼은 정수형이나 문자열 같은 단순 데이터 타입, 배열이나 맵 같은 복합 데이터 타입 그리고 null 로 구분된다. 

개발 언어의 기본적인 데이터 타입은 다 들어있다. 

자세한 정보는 링크 참조

 

컬럼 생성 및 참조는 여러가지 방법이 있지만 col함수나, column 함수를 사용한다. (둘 다 동일한 기능)

from pyspark.sql.functions import col, column
col("someColumnName")
column("someColumnName")

표현식

여러 컬럼명을 입력받아 식별하고 단일 값을 만들기 위해 다양한 표현식을 각 레코드에 적용하는 함수이다. 

expr 함수를 사용하며, 예를 들면 expr("someCol")은 col("someCol") 구문과 동일하게 동작한다. 

# 모두 같은 트랜스포메이션을 가진다. 
expr("someCol - 5")
col("someCol") - 5
expr("someCol") - 5

from pyspark.sql.functions import expr
expr("(((someCol + 5) * 200) - 6) = otherCol")

Record와 Row

스파크에서 DataFrame의 각 로우는 하나의 레코드며, 레코드를 Row 객체로 표현한다. 

DataFrame만 유일하게 스키마 정보를 가지고 있고, Row 객체는 스키마 정보를 가지고 있지 않다.

그러므로 Row 객체를 직접 생성하려면 DataFrame의 스키마와 같은 순서로 값을 명시해야 한다. 

from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)
print(myRow[0])
print(myRow[2])
# 결과 
# Hello
# 1

DataFrame 다루기

DataFrame의 기본적인 기능은 알았으니 실제로 DataFrame의 데이터를 다뤄보자.

초반에 2015-summary.json 파일을 로드하여 생성된 DataFrame을 가지고 진행한다. 

데이터 조회

select와 selectExpr 메소드를 사용하면 마치 테이블에 SQL 질의를 실행한 것처럼 데이터 조회를 할 수 있다. 

select 메소드를 살펴보자.

# df DataFrame 전체 Row 개수 조회
print(df.count)
# 256

# DataFrame 5개의 로우 조회
df.show(5)
# 결과
# +-----------------+-------------------+-----+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
# +-----------------+-------------------+-----+
# |    United States|            Romania|   15|
# |    United States|            Croatia|    1|
# |    United States|            Ireland|  344|
# |            Egypt|      United States|   15|
# |    United States|              India|   62|
# +-----------------+-------------------+-----+

# DataFrame의 DEST_COUNTRY_NAME 컬럼으로 2개 로우 조회
df.select("DEST_COUNTRY_NAME").show(2)
# 결과
# +-----------------+
# |DEST_COUNTRY_NAME|
# +-----------------+
# |    United States|
# |    United States|
# +-----------------+

# DataFrame의 DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME 컬럼으로 2개의 로우 조회
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
# 결과
# +-----------------+-------------------+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
# +-----------------+-------------------+
# |    United States|            Romania|
# |    United States|            Croatia|
# +-----------------+-------------------+

# DEST_COUNTRY_NAME 컬럼을 3가지 방법으로 조회
from pyspark.sql.functions import expr, col, column
df.select(
    expr("DEST_COUNTRY_NAME"),
    col("DEST_COUNTRY_NAME"),
    column("DEST_COUNTRY_NAME"))\
  .show(2)
# 결과
# +-----------------+-----------------+-----------------+
# |DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
# +-----------------+-----------------+-----------------+
# |    United States|    United States|    United States|
# |    United States|    United States|    United States|
# +-----------------+-----------------+-----------------+

select 메소드 안에 컬럼 지정을 expr 함수로 같이 사용할 수 있다. 

함수와 메소드의 차이는 객체(Object)에 속해있으면 메소드, 속해있지 않으면 함수로 보면 된다. 

예) print( ), type( ), str( ), bool( ) 등과 같이 자료형을 조회하거나 변경 시  사용하는 것들은 모두 함수

예) 리스트를 기준으로 index( ), count( ), append( ), remove( ), reverse( )는 객체와 관련이 있으므로 메소드

# DEST_COUNTRY_NAME 컬럼을 destination명으로 변경하여 조회
from pyspark.sql.functions import expr, col, column
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)
# 결과
# +-------------+
# |  destination|
# +-------------+
# |United States|
# |United States|
+-------------+

# DEST_COUNTRY_NAME 컬럼을 destination명으로 변경하고 또 DEST_COUNTRY_NAME 이름으로 변경
df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME"))\
  .show(2)
# 결과
# +-----------------+
# |DEST_COUNTRY_NAME|
# +-----------------+
# |    United States|
# |    United States|
# +-----------------+

select 메서드에 expr 함수를 사용하는 패턴이 많아져서 스파크는 이런 작업을 효율적으로 할수 있는 selectExpr 메서드를 제공한다. 

df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)
# 결과
# +-------------+-----------------+
# |newColumnName|DEST_COUNTRY_NAME|
# +-------------+-----------------+
# |United States|    United States|
# |United States|    United States|
# +-------------+-----------------+

df.selectExpr(
  "*", # all original columns
  "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
  .show(2)
# 결과
# +-----------------+-------------------+-----+-------------+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
# +-----------------+-------------------+-----+-------------+
# |    United States|            Romania|   15|        false|
# |    United States|            Croatia|    1|        false|
# +-----------------+-------------------+-----+-------------+

df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)
# 결과
# [Stage 8:====================================================>  (191 + 1) / 200]+-----------+---------------------------------+
# | avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
# +-----------+---------------------------------+
# |1770.765625|                              132|
# +-----------+---------------------------------+

distinct 메서드를 이용하여 중복을 제거하고 고유한 로우를 얻을 수도 있다. 

 # ORIGIN_COUNTRY_NAME 컬럼의 로우에 대한 중복을 제거 하고 데이터 및 개수 조회
df5 = df.select("ORIGIN_COUNTRY_NAME").distinct()
df5.show(5)
print(df.count())
# 결과
# +-------------------+
# |ORIGIN_COUNTRY_NAME|
# +-------------------+
# |           Paraguay|
# |             Russia|
# |           Anguilla|
# |            Senegal|
# |             Sweden|
# +-------------------+
# only showing top 5 rows
# 256

sort 와 orderBy 메서드를 사용하여 정렬할 수 있다 .

기본 동작은 asc(오름차순)이다. 

# sort 메소드를 사용하여 count 컬럼 정렬 5 로우 조회
df.sort("count").show(5)
# 결과
# +--------------------+-------------------+-----+
# |   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
# +--------------------+-------------------+-----+
# |               Malta|      United States|    1|
# |Saint Vincent and...|      United States|    1|
# |       United States|            Croatia|    1|
# |       United States|          Gibraltar|    1|
# |       United States|          Singapore|    1|
# +--------------------+-------------------+-----+

# orderBy 메소드를 사용하여 count, DEST_COUNTRY_NAME 정렬 5 로우 조회
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
# 결과
# +-----------------+-------------------+-----+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
# +-----------------+-------------------+-----+
# |     Burkina Faso|      United States|    1|
# |    Cote d'Ivoire|      United States|    1|
# |           Cyprus|      United States|    1|
# |         Djibouti|      United States|    1|
# |        Indonesia|      United States|    1|
# +-----------------+-------------------+-----+
 
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)
# 결과
# +-----------------+-------------------+-----+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
# +-----------------+-------------------+-----+
# |     Burkina Faso|      United States|    1|
# |    Cote d'Ivoire|      United States|    1|
# |           Cyprus|      United States|    1|
# |         Djibouti|      United States|    1|
# |        Indonesia|      United States|    1|
# +-----------------+-------------------+-----+

from pyspark.sql.functions import desc, asc

# orderBy 메소드를 사용하여 count 내림차순 2 로우 조회
df.orderBy(expr("count desc")).show(2)
# 결과
# +-----------------+-------------------+-----+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
# +-----------------+-------------------+-----+
# |          Moldova|      United States|    1|
# |    United States|            Croatia|    1|
# +-----------------+-------------------+-----+

# orderBy 메소드를 사용하여 count 내림차순, DEST_COUNTRY_NAME 오름차순 2 로우 조회
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)
# 결과
# +-----------------+-------------------+------+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
# +-----------------+-------------------+------+
# |    United States|      United States|370002|
# |    United States|             Canada|  8483|
# +-----------------+-------------------+------+

컬럼 추가와 컬럼명 변경

컬럼 추가는 DataFrame의 withColumn 메서드를 이용한다.

# numberOne이라는 컬럼을 추가
from pyspark.sql.functions import lit
df.withColumn("numberOne", lit(1)).show(2)
# 결과
# +-----------------+-------------------+-----+---------+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
# +-----------------+-------------------+-----+---------+
# |    United States|            Romania|   15|        1|
# |    United States|            Croatia|    1|        1|
# +-----------------+-------------------+-----+---------+

# expr 조건을 통하여 boolean 타입으로 withinCountry 컬럼을 추가
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))\
  .show(2)
# 결과 
# +-----------------+-------------------+-----+-------------+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
# +-----------------+-------------------+-----+-------------+
# |    United States|            Romania|   15|        false|
# |    United States|            Croatia|    1|        false|
# +-----------------+-------------------+-----+-------------+

컬럼명 변경은 withColumnRenamed 메서드를 이용한다. 

# DEST_COUNTRY_NAME 컬럼을 dest로 변경
df.show(2)
df1 = df.withColumnRenamed("DEST_COUNTRY_NAME", "dest")
df1.show(2)
# 결과
# +-----------------+-------------------+-----+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
# +-----------------+-------------------+-----+
# |    United States|            Romania|   15|
# |    United States|            Croatia|    1|
# +-----------------+-------------------+-----+
# +-------------+-------------------+-----+
# |         dest|ORIGIN_COUNTRY_NAME|count|
# +-------------+-------------------+-----+
# |United States|            Romania|   15|
# |United States|            Croatia|    1|
# +-------------+-------------------+-----+

대소문자 구분

스파크는 기본적으로 대소문자를 구분하지 않는다. 

대소문자를 구분하게 만드려면 아래의 3가지 방법 중 하나의 방법에 대한 코드을 추가한다. 

# 1. SparkSession을 생성할 때 config 옵션에 넣는다. 
spark = SparkSession.builder \
    .master("local") \
    .appName("testapp") \
    .config("spark.some.config.option", "some-value") \
    .config("spark.sql.caseSensitive", "true") \
    .getOrCreate()

# 2. spark.conf.set 함수를 이용
spark.conf.set("spark.sql.caseSensitive", "true")

# 3. spark.sql 함수 이용
spark.sql("SET spark.sql.caseSensitive=true")

컬럼 제거

select 메서드로 컬럼을 제거할 수 있지만 drop 메서드를 사용할 수도 있다. 

# ORIGIN_COUNTRY_NAME 컬럼 제거
df2 = df.drop("ORIGIN_COUNTRY_NAME")
df2.show(2)
# 결과 
# +-----------------+-----+
# |DEST_COUNTRY_NAME|count|
# +-----------------+-----+
# |    United States|   15|
# |    United States|    1|
# +-----------------+-----+

# ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME 2 개의 컬럼 제거
df3 = df.drop("ORIGIN_COUNTRY_NAME", ", DEST_COUNTRY_NAME")
df3.show(2)
# 결과
# +-----+
# |count|
# +-----+
# |   15|
# |    1|
# +-----+

컬럼 타입 변경

특정 컬럼의 데이터 타입을 다른 데이터 타입으로 형변환할 경우가 있다. 

cast 메서드를 이용하여 데이터 타입을 변환할 수 있다. 

# LongType의 count 컬럼을 StringType의 count2 컬럼으로 변경
print(df.schema) # 기존 스키마 조회
df4 = df.withColumn("count2", col("count").cast("string"))
df4.show(2)
print(df4.schema) # 변경된 스키마 조회
# 결과
# StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))
# +-----------------+-------------------+-----+------+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|count2|
# +-----------------+-------------------+-----+------+
# |    United States|            Romania|   15|    15|
# |    United States|            Croatia|    1|     1|
# +-----------------+-------------------+-----+------+
# only showing top 2 rows
# StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true),StructField(count2,StringType,true)))

로우 필터링

로우를 필터링 하려면 참과 거짓을 판별하는 표현식을 만들어야 한다. 그래서 표현식의 결과가 false인 로우를 걸러내면 된다.

where 메서드와 filter 메서드를 이용한다.  

 # count가 2 이상인 로우 2개 조회
df.filter(col("count") < 2 ).show(2)
# 결과
# +-----------------+-------------------+-----+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
# +-----------------+-------------------+-----+
# |    United States|            Croatia|    1|
# |    United States|          Singapore|    1|
# +-----------------+-------------------+-----+

df.where("count < 2").show(2)
# 결과
# +-----------------+-------------------+-----+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
# +-----------------+-------------------+-----+
# |    United States|            Croatia|    1|
# |    United States|          Singapore|    1|
# +-----------------+-------------------+-----+

마치며

구조적 API Dataframd의 기본적인 연산을 확인해 보았다. 

다름에는 다양한 데이터 타입에 대해서 알아본다. 

Posted by 사용자 피랑이

댓글을 달아 주세요

[스파크(Spark)] #1. 개요

[스파크(Spark)] #2. 용어 및 개념

[스파크(Spark)] #3. 구조적 API 개요 및 기본 연산

 

스파크가 무엇인지에 대한 개요에 대해서 알아보았다. 

이번에는 핵심 용어 및 개념에 대해서 알아본다. 

스파크 애플리케이션 아키텍처

사용자는 클러스터 매니저에게 스파크 애플리케이션을 제출한다. 

클러스터 매니저는 제출받은 애플리케이션 실행에 필요한 자원을 할당하고, 스파크 애플리케이션은 할당받은 자원으로 작업을 처리한다.  

 

스파크 애플리케이션은 드라이버 프로세스와 다수의 익스큐터 프로세스로 구성된다. 

드라이버 프로세스는 클러스터 노드 중 하나에에서만 실행한다. 즉 main() 함수를 실행한다. 

익스큐더는 다수의 도드에서 실행하며, 드라이버가 할당한 작업을 수행한다. 

사용자는 각 노드에 할당할 익스큐터 수를 지정할 수 있다. 

클러스터 매니저는 스파크가 연산에 사용할 4개의 클러스터 종류를 지원한다. 

드라이버(driver)
  • 하나의 스파크 애플리케이션 처리 
  • 프로그램이나 입력에 대한 응답
  • 익스큐터 작업과 관련된 분석, 배포 및 스케쥴링 역활 수행
  • SparkSession라고 하며 스파크 애플리케이션의 엔트리 역활을 맡은 object임
익스큐터(executor)
  • 드라이버가 할당한 코드를 실행
  • 실행한 진행상황을 드라이버에 보고

스파크는 사용 가능한 자원을 파악하기 위해 클러스터 매니저를 사용한다. 

드라이버 프로세스는 주어진 작업을 완료하기 위해 익스큐터에게 명령을 내린다. 

트랜스포메이션과 액션

스파크 핵심 데이터 구조는 불변성이다. 즉 데이터를 한번 생성하면 변경할 수 없다. 

데이터를 변경하려면 스파크에게 알려줘야 한다. 이 때 사용하는 명령이 트랜스포메이션이다. 

사용자는 트랜스포메이션을 사용해 논리적 실행 계획을 만든다.  하지만 액션을 호출하지 않으면 스파크는 실제 트랜스포메이션을 실행하지 않는다. 

사용자는 트랜스포메이션을 사용해 논리적 실행 계획을 세우고 액션을 통하여 실제 연산을 수행한다. 

아래 python 예제를 참조하자.

#트랜스포메이션 divisBy2 = myRange.where("number % 2 = 0") #액션 divisBy2.count()

트랜트포메이션은 두가지 유형의 의존성 존재한다. 

좁은 의존성
  • 하나의 입력 파티션이 하나의 출력 파티션에만 영향을 미침
  • 파이프라이닝
넓은 의존성
  • 하나의 입력 파티션이 여러 출력 파티션에 영향을 미침
  • 셔플

파티션 관련 내용은 DataFrame을 먼저 참조하길 바란다.

스파크 기본 요소

스파크는 저수준의 API, 구조적 API, 그리고 추가로 제공하는 일련의 표준 라이브러리로 구성되어 있다.

저수준의 API와 구조적 API 차이는 데이터 스키마 여부와 추상화에 있다. 

API 특징

RDD - Spark 1.0
  • Resilient Distributed Dataset : 탄력적이면서 분산된 데이터 셋
  • 스키마가 없음
  • 한번 정의하면 변경 불가능
  • 장애 발생 시 복구 가능
  • 현재는 RDD 비중이 높으나 Dataset이 비중이 늘어나고 있음
DataFrame - Spark 1.3
  • 스키마를 가진 RDD
  • RDB 테이블과 비슷하게 명명된 열로 구성됨
  • RDD 처럼 한번 정의하면 변경 불가능
  • 질의나 API를 통해 데이터를 쉽게 처리 가능
DataSet - Spark 1.6, 2.0
  • Spark 2.0 부터는 DataFrame과 통합되어 강력한 형식의 API와 형식지 지정되지 않은 API를 사용
  • lambda 사용 가능
  • 정적 타이핑 및 런타임 유형이 안전 
  • 저수준의 API를 굳이 사용할 필요가 없으면 DataSet을 권장함

API 차이점

마치며

스파크의 기본 용어 및 개념에 대해서 알아보았다. 

다음에는 구조적 API와 기본연산에 대해서 알아본다. 

 

참고

 

Difference between DataFrame, Dataset, and RDD in Spark

I'm just wondering what is the difference between an RDD and DataFrame (Spark 2.0.0 DataFrame is a mere type alias for Dataset[Row]) in Apache Spark? Can you convert one to the other?

stackoverflow.com

 

A Tale of Three Apache Spark APIs: RDDs vs DataFrames and Datasets

In summation, the choice of when to use RDD or DataFrame and/or Dataset seems obvious. While the former offers you low-level functionality and control, the latter allows custom view and structure, offers high-level and domain specific operations, saves spa

databricks.com

 

A comparison between RDD, DataFrame and Dataset in Spark from a developer’s point of view

APIs in Spark are great and contribute to the awesomeness of Spark. This so helpful framework is used to process big data.

medium.zenika.com

 

Posted by 사용자 피랑이
TAG Spark

댓글을 달아 주세요

컨테이너 오케스트레이션 개요

 

도커 컨테이너의 갯수가 꾸준히 늘어나면 필요한 자원도 지속적으로 늘어나기 마련이다 때문에 서버 또한 여러대로 늘어날 수 있는데 한대 두대의 수준이 아니라 몇 십 몇 백대의 서버로 늘어났다고 가정 해보자 이 많은 서버들을 일일이 접근하여 명령어 날려주고 컨테이너 올리고 "어? 이건 또 왜 내려갔어?" 하다가 시간은 시간대로 흘러버리고 정신을 차려보면 라꾸라꾸 침대가 본인 의자옆에 있는것을 발견 할 수 있을것이다. 결론적으로 이 많은 서버들과 컨테이너를 소수의 인원으로 관리하기에는 상당히 어렵고 이 문제를 효율적으로 관리하기 위해 컨테이너 오케스트레이션 툴들이 나오게 되었다.

 

 

컨테이너 오케스트레이션 툴 소개

Docker Swarm / Kubernetes / Apache Mesos

 

컨테이너 오케스트레이션 툴의 기능에는 단순 컨테이너의 배포 뿐만이 아닌 하나의 서비스를 관리하고 유지보수 하기 위한 많은 기능들을 포함하고 있고 툴마다 기능에 대한 편차는 있으나 주요 기능은 아래와 같다.

 

 

오케스트레이션 툴의 기능

  • 노드 클러스터링
  • 컨테이너 로드 밸런싱
  • 컨테이너의 배포와 복제 자동화
  • 컨테이너 장애 복구기능
  • 컨테이너 자동 확장 및 축소
  • 컨테이너 스케쥴링
  • 로깅 및 모니터링

 

컨테이너 오케스트레이션 툴의 종류는 여러가지가 있지만 그 중 가장 인지도가 높고 자주 소개되는 Docker Swarm, Kubernetes, Apache Mesos에 대해서 각 툴의 장점과 단점을 간략히 설명 하도록 하겠다.

 

 

Docker Swarm

 

도커 스웜은 도커 컨테이너 플랫폼에 통합 된 컨테이너 오케스트레이션 툴이다. 최초에 도커 스웜은 도커와 별개로 개발 되었으나 도커 1.12 버전부터 도커 스웜 모드라는 이름으로 합쳐졌다.

 

장점

  • 도커 명령어와 도커 컴포즈를 포함한 도커의 모든 기능이 내장되어 있다.
  • 도커 이외의 별도의 툴 설치가 필요하지 않다.
  • 타 오케스트레이션 툴에 비해 복잡하지 않고 다루기 쉽다.

 

단점

  • 타 오케스트레이션 툴에 비해 기능이 단순하여 세부적인 설정이 어려움
  • 초대형 노드 클러스터링에는 무리가 있다.

 

 

Kubernetes

 

쿠버네티스는 구글에서 개발한 2014년 오픈소스화 된 프로젝트이다. 무려 15년에 걸친 구글의 대규모 운영 워크로드 운영 경험과 노하우가 축적된 프로젝트로 컨테이너 중심의 관리환경을 제공한다.

 

장점

  • 현재 가장 인지도가 높고 기능이 많은 오케스트레이션 툴
  • 내장된 기능이 많아 타사 애드온이 불필요함

 

단점

  • 쿠버네티스의 구성과 개념에 대한 이해가 필요하다.
  • 학습해야할 부분이 많고 소규모 프로젝트에서 구축하기 쉽지 않다.

 

 

Apache Mesos

 

아파치 메소스는 Twitter, Apple, Uber, Netflix 등 대형 서비스를 운영하고 있는 기업에서 다수 채택 되었으며 마이크로서비스와 빅데이터, 실시간 분석, 엘라스틱 스케일링 기능 등을 제공하고 있다.

 

장점

  • 대형 서비스를 운영중인 회사에서 많이 채택 되었고 안정성이 검증되었다.
  • 수만대의 물리적 시스템으로 확장 가능하게 설계 되어있다.
  • Zookeeper, Hadoop, Spark와 같은 응용프로그램을 연동하여 노드 클러스터링과 자원 최적화 가능

 

단점

  • 너무 다양한 응용프로그램의 연동으로 인하여 복잡해질 수 있다.
  • 설치 및 관리가 어렵고 컨테이너를 활용하기 위해 Marathon 프레임워크를 추가로 설치해야 한다.

 

 

결론 : 이럴 때 적합!

 

Docker Swarm

  • 중소형 프로젝트일 경우
  • 관리 할 노드가 적고 많은 기능이 필요하지 않을 경우

 

Kubernetes

  • 대형 프로젝트일 경우
  • 세밀하고 다양한 설정 기능이 필요한 경우

 

Apache Mesos

  • 대형 프로젝트일 경우
  • 검증된 오케스트레이션 툴을 찾고있는 경우

 

 

Docker : 컨테이너 오케스트레이션 개요 편

끝.

'Docker' 카테고리의 다른 글

Docker : 도커스웜 클러스터 구축 편  (0) 2019.05.17
Docker : 컨테이너 오케스트레이션 개요 편  (0) 2019.04.19
Docker : Dockerfile 실습 편  (0) 2019.03.22
Docker : Dockerfile 편  (0) 2019.03.07
Docker : 이미지 편  (0) 2019.02.22
Docker : 컨테이너 편  (1) 2019.02.15
Posted by DevStream

댓글을 달아 주세요

외부에서 제작한 회의실 예약을 관리하는 서비스를 사용중이다.

이 서비스에서 예약한 회의 30분 전에 회의 참가자들에게 알림을 주고 싶다는 요구사항이 생겼다.

외부에서 제작한 서비스이기 때문에 직접 서비스에 기능을 추가할 수 없는 상황이었다.

 

다만 서비스에서 회의실 예약 정보를 저장하고 있는 DB의 테이블과 자료의 형식은 알고있었다.

그래서 해당 테이블을 감시하면서 다가온 회의실 예약에 대해 알림을 제공하는 프로그램을 만들기로 하였다.

 

예약 정보는 DB에서 가져왔지만, 이걸 알림을 주려면 뭘로 해야할지 고민이 되었다.

쉽게는 텔레그램 같은 메신저로 보내도 되고, 사내에서 사용하는 로켓챗을 이용해도 된다.

여러가지를 고민하다가 구글캘린더에 일정으로 등록해주면 이 예약정보를 다양하게 활용이 가능하겠다는 생각을 하게 되었다.

 

구글캘린더에 일정으로 등록을 하게되면..

  • 일정이 등록된 순간 사용자의 구글계정 일정에 등록이 되고 앱에서 등록 알림을 준다.
  • 사용자가 자신의 일정앱에서 일정 시작 몇분전에 알림을 줄지 정할 수 있다(기본값은 30분, 10분)
  • 휴대폰의 일정앱 뿐 아니라 컴퓨터의 브라우저에서도 일정 기능을 키면 확인이 가능하다.

구글캘린더를 도입 하려면 알림을 받을 사람들의 구글계정이 필요하다.

요즘 대부분 구글계정 한개씩 가지고 있기때문에 큰 제약은 아니었다.

이 점만 합의가 된다면 원래 요건을 충족하고 추가로 편의성을 더 보장할 수 있어서 구글캘린더에 회의실 예약 정보를 등록하기로 하였다.

 

구글캘린더 API 와 연동하려면 구글개발자 계정과 API 등록과정이 필요하다.

또한 JAVA에서 OAuth2 인증을 위해 인증정보를 담은 json 파일이 필요하다.

설정과정은 정리가 잘 된 링크를 첨부한다. 아래 링크에서 json 파일을 다운받는 곳 까지 진행하면 된다.

https://kingbbode.tistory.com/8

 

 

개발은 SpringBoot 2, Maven 환경에서 진행하였다.

연동부분에 대한 코드는 딱히 스프링의 라이브러리를 사용하지 않았으니 스프링이 꼭 필요하지는 않다.

 

구글 oauth 인증을 하기 위해서는 아래 라이브러리 디펜던시를 추가 해야한다.

 

<dependency>
   <groupId>com.google.api-client</groupId>
   <artifactId>google-api-client</artifactId>
   <version>1.23.0</version>
</dependency>
<dependency>
   <groupId>com.google.oauth-client</groupId>
   <artifactId>google-oauth-client-jetty</artifactId>
   <version>1.23.0</version>
</dependency>
<dependency>
   <groupId>com.google.apis</groupId>
   <artifactId>google-api-services-calendar</artifactId>
   <version>v3-rev305-1.23.0</version>
</dependency>

 

아래는 구글캘린더 API 를 이용해 일정을 등록하고 삭제하는 내용을 담은 코드이다.

public class GoogleCalendar {



   private static final String APPLICATION_NAME = "Google Calendar API Java Quickstart";

   private static final JsonFactory JSON_FACTORY = JacksonFactory.getDefaultInstance();

   private static final String CREDENTIALS_FOLDER = "credentials"; // Directory to store user credentials.

   private static final String CALENDAR_ID = "[캘린더연동아이디]";



   /**

    * Global instance of the scopes required by this quickstart.

    * If modifying these scopes, delete your previously saved credentials/ folder.

    */

   private static final List<String> SCOPES = Collections.singletonList(CalendarScopes.CALENDAR);

   private static final String CLIENT_SECRET_DIR = "/client_secret.json";





   public static Event addEvent(Event event) throws IOException, GeneralSecurityException, GoogleJsonResponseException {

       final NetHttpTransport HTTP_TRANSPORT = GoogleNetHttpTransport.newTrustedTransport();

       Calendar service = new Calendar.Builder(HTTP_TRANSPORT, JSON_FACTORY, getCredentials(HTTP_TRANSPORT))

               .setApplicationName(APPLICATION_NAME)

               .build();

       return service.events().insert(CALENDAR_ID, event).execute();

   }



   public static void delEvent(String eventKey) throws IOException, GeneralSecurityException, GoogleJsonResponseException {

       final NetHttpTransport HTTP_TRANSPORT = GoogleNetHttpTransport.newTrustedTransport();

       Calendar service = new Calendar.Builder(HTTP_TRANSPORT, JSON_FACTORY, getCredentials(HTTP_TRANSPORT))

               .setApplicationName(APPLICATION_NAME)

               .build();

       service.events().delete(CALENDAR_ID, eventKey).execute();

   }



   /**

    * Creates an authorized Credential object.

    * @param HTTP_TRANSPORT The network HTTP Transport.

    * @return An authorized Credential object.

    * @throws IOException If there is no client_secret.

    */

   private static Credential getCredentials(final NetHttpTransport HTTP_TRANSPORT) throws IOException {

       // Load client secrets.

       InputStream in = GoogleCalendar.class.getResourceAsStream(CLIENT_SECRET_DIR);

       GoogleClientSecrets clientSecrets = GoogleClientSecrets.load(JSON_FACTORY, new InputStreamReader(in));



       // Build flow and trigger user authorization request.

       GoogleAuthorizationCodeFlow flow = new GoogleAuthorizationCodeFlow.Builder(

               HTTP_TRANSPORT, JSON_FACTORY, clientSecrets, SCOPES)

               .setDataStoreFactory(new FileDataStoreFactory(new java.io.File(CREDENTIALS_FOLDER)))

               .setAccessType("offline")

               .build();

       return new AuthorizationCodeInstalledApp(flow, new LocalServerReceiver()).authorize("user");

   }

}

getCredentials

OAuth 인증을 이용해 구글에서 인증을 가져오는 메소드이다.

앞에서 만든 인증 json을 불러와야 한다. 나의 경우는 resource 폴더에 넣고 불러왔다.

이 인증메소드는 api 요청 때 마다 실행해서 인증정보를 반환한다.

 

addEvent

구글캘린더에 일정을 추가하는 메소드이다.

일정정보를 담기 위해 Event 라는 객체를 사용한다.

Event 객체를 사용하는 방법은 아래에 따로 다루었으니 참고하자.

 

delEvent

구글캘린더에 일정을 삭제하는 메소드이다.

addEvent 했을때 구글에서 응답받은 eventKey 가 필요하다.

 

Event 생성

이벤트 생성은 크게 제목, 시작시간, 종료시간, 참여자 정보가 필요하다. 아래는 일정을 추가하는 메소드이다.

참여자정보(ResourceSubscriber), 일정정보(RecourceInfo), 사원정보(EmpInfo) 는 나의 시스템에서 사용하는 DB에 맞게 직접 만든 객체이다. 참여자정보, 일정정보, 사원정보 부분은 본인의 시스템에 맞게 각자 제작하면 된다.

 

public Event makeEvent(ResourceInfo resourceInfo, List<ResourceSubscriber> subList) throws ParseException, IOException, GeneralSecurityException {

   Event event = new Event()

           .setSummary(resourceInfo.getReqText())

           .setLocation(getMeetingRoom(resourceInfo.getResSeq()).getResName()) //장소이름은 따로 불러오길..

           .setDescription(resourceInfo.getDescText());



   DateTime startDateTime = new DateTime(dateTimeTzFormat.format(dateTimeFormat.parse(resourceInfo.getStartDate())));

   EventDateTime start = new EventDateTime()

           .setDateTime(startDateTime)

           .setTimeZone("Asia/Seoul");

   event.setStart(start);



   DateTime endDateTime = new DateTime(dateTimeTzFormat.format(dateTimeFormat.parse(resourceInfo.getEndDate())));

   EventDateTime end = new EventDateTime()

           .setDateTime(endDateTime)

           .setTimeZone("Asia/Seoul");

   event.setEnd(end);



   EventReminder[] reminderOverrides = new EventReminder[]{

           new EventReminder().setMethod("popup").setMinutes(10),

   };

   Event.Reminders reminders = new Event.Reminders()

           .setUseDefault(false)

           .setOverrides(Arrays.asList(reminderOverrides));

   event.setReminders(reminders);



   List<EventAttendee> attendList = new ArrayList();



   for (ResourceSubscriber subObj : subList) {

       logger.info("SUB ADD OBJ : " + subObj.toString());

       EmpInfo empInfo = getEmpInfo(subObj.getEmpSeq());



       if (empInfo.getOutMail() != null && empInfo.getOutMail().length() > 0 && empInfo.getOutDomain() != null && empInfo.getOutDomain().length() > 0) {

           if (empInfo.getOutDomain().equals("gmail.com")) {

               attendList.add(new EventAttendee().setEmail(empInfo.getOutMail() + "@" + empInfo.getOutDomain()));

           } else {

               //이메일주소가 gmail.com 이 아닌 케이스. 일단은 아무것도 안함.

           }

       } else {

           //메일주소가 없는 케이스.

       }

   }

   event.setAttendees(attendList);

   try {

       event = GoogleCalendar.addEvent(event);

   } catch (Exception ex) {

       logger.info("Calendar Exception in insert");

   }

   logger.info("EVENT : " + event.toPrettyString());



   return event;

}

 

구글 캘린더 API 에는 더 다양한 기능이 있다. 아래 링크에서 참고하자.

https://developers.google.com/calendar/v3/reference/

불러오는 중입니다...

 

Posted by panpid

댓글을 달아 주세요