[스파크(Spark)] #3. 구조적 API 개요 및 기본 연산
[스파크(Spark)] #3. 구조적 API 개요 및 기본 연산
이번에는 구조적 API의 개요 및 기본 연산에 대해서 알아본다.
DataFrame와 Dataset은 둘 다 Row와 Column을 가지는 불변성을 가지는 분산 테이블 형태의 컬렉션이다.
Dataset은 JVM 기반이므로 java와 scala를 지원하지만 Python은 지원하지 않는다.
python 코드로 검증을 할 예정이므로 DataFrame 기준으로 설명한다.
참고
Spark API 관련 자세한 부분은 Spark Docs를 참고하자.
DataFrame을 가공 관련 부분은 pyspark.sql 모듈을 사용한다.
SparkSession
Spark의 모든 기능에 대한 진입점은 SparkSession클래스를 사용해야 한다.
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \ .master("local") \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
- builder : 객체 생성
- master : 실행 환경을 설정
local | 로컬 실행 |
local[4] | 4코어로 로컬 실행 |
spark : // master : 7077 | Spark 독립 실행형 클러스터 |
- config : 실행 옵션 설정, SparkConf 및 SparkSession 자체 구성에 자동으로 전파
(SparkConf는 Spark의 런타임 구성 인터페이스며 이 인터페이스를 통해 사용자는 Spark SQL과 관련된 모든 Spark 및 Hadoop 구성을 가져오고 설정할 수 있음)
- getOrCreate : 기존 SparkSession을 가져 오거나 없는 경우 실더에 설정된 옵션을 기반으로 새로운 SparkSession을 생성
DataFrame 생성
SparkSession응용 프로그램이 기존RDD , 하이브 테이블 또는 Spark 데이터 소스 에서 DataFrames을 만들 수 있다.
df = spark.read.format("json")\
.load("D:/2015-summary.json")
print(df)
# 결과
# DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]
스키마
DataFrame의 칼럼명과 데이터 타입을 정의한다.
스키마는 데이터 소스에서 얻거나(schema-on-read) 직접 정의할 수 있다.
# 1. 데이터 소스에서 얻는 방법
print(df.schema)
# 결과
# StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))
# 2. 직접 정의 방법
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField("DEST_COUNTRY_NAME", StringType(), True),
StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
StructField("count", LongType(), True)
])
print(myManualSchema)
# 결과
# StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,false)))
StructType의 자세한 정보는 링크 참조
컬럼과 표현식
컬럼
컬럼은 정수형이나 문자열 같은 단순 데이터 타입, 배열이나 맵 같은 복합 데이터 타입 그리고 null 로 구분된다.
개발 언어의 기본적인 데이터 타입은 다 들어있다.
자세한 정보는 링크 참조
컬럼 생성 및 참조는 여러가지 방법이 있지만 col함수나, column 함수를 사용한다. (둘 다 동일한 기능)
from pyspark.sql.functions import col, column
col("someColumnName")
column("someColumnName")
표현식
여러 컬럼명을 입력받아 식별하고 단일 값을 만들기 위해 다양한 표현식을 각 레코드에 적용하는 함수이다.
expr 함수를 사용하며, 예를 들면 expr("someCol")은 col("someCol") 구문과 동일하게 동작한다.
# 모두 같은 트랜스포메이션을 가진다.
expr("someCol - 5")
col("someCol") - 5
expr("someCol") - 5
from pyspark.sql.functions import expr
expr("(((someCol + 5) * 200) - 6) = otherCol")
Record와 Row
스파크에서 DataFrame의 각 로우는 하나의 레코드며, 레코드를 Row 객체로 표현한다.
DataFrame만 유일하게 스키마 정보를 가지고 있고, Row 객체는 스키마 정보를 가지고 있지 않다.
그러므로 Row 객체를 직접 생성하려면 DataFrame의 스키마와 같은 순서로 값을 명시해야 한다.
from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)
print(myRow[0])
print(myRow[2])
# 결과
# Hello
# 1
DataFrame 다루기
DataFrame의 기본적인 기능은 알았으니 실제로 DataFrame의 데이터를 다뤄보자.
초반에 2015-summary.json 파일을 로드하여 생성된 DataFrame을 가지고 진행한다.
데이터 조회
select와 selectExpr 메소드를 사용하면 마치 테이블에 SQL 질의를 실행한 것처럼 데이터 조회를 할 수 있다.
select 메소드를 살펴보자.
# df DataFrame 전체 Row 개수 조회
print(df.count)
# 256
# DataFrame 5개의 로우 조회
df.show(5)
# 결과
# +-----------------+-------------------+-----+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
# +-----------------+-------------------+-----+
# | United States| Romania| 15|
# | United States| Croatia| 1|
# | United States| Ireland| 344|
# | Egypt| United States| 15|
# | United States| India| 62|
# +-----------------+-------------------+-----+
# DataFrame의 DEST_COUNTRY_NAME 컬럼으로 2개 로우 조회
df.select("DEST_COUNTRY_NAME").show(2)
# 결과
# +-----------------+
# |DEST_COUNTRY_NAME|
# +-----------------+
# | United States|
# | United States|
# +-----------------+
# DataFrame의 DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME 컬럼으로 2개의 로우 조회
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
# 결과
# +-----------------+-------------------+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
# +-----------------+-------------------+
# | United States| Romania|
# | United States| Croatia|
# +-----------------+-------------------+
# DEST_COUNTRY_NAME 컬럼을 3가지 방법으로 조회
from pyspark.sql.functions import expr, col, column
df.select(
expr("DEST_COUNTRY_NAME"),
col("DEST_COUNTRY_NAME"),
column("DEST_COUNTRY_NAME"))\
.show(2)
# 결과
# +-----------------+-----------------+-----------------+
# |DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
# +-----------------+-----------------+-----------------+
# | United States| United States| United States|
# | United States| United States| United States|
# +-----------------+-----------------+-----------------+
select 메소드 안에 컬럼 지정을 expr 함수로 같이 사용할 수 있다.
함수와 메소드의 차이는 객체(Object)에 속해있으면 메소드, 속해있지 않으면 함수로 보면 된다.
예) print( ), type( ), str( ), bool( ) 등과 같이 자료형을 조회하거나 변경 시 사용하는 것들은 모두 함수
예) 리스트를 기준으로 index( ), count( ), append( ), remove( ), reverse( )는 객체와 관련이 있으므로 메소드
# DEST_COUNTRY_NAME 컬럼을 destination명으로 변경하여 조회
from pyspark.sql.functions import expr, col, column
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)
# 결과
# +-------------+
# | destination|
# +-------------+
# |United States|
# |United States|
+-------------+
# DEST_COUNTRY_NAME 컬럼을 destination명으로 변경하고 또 DEST_COUNTRY_NAME 이름으로 변경
df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME"))\
.show(2)
# 결과
# +-----------------+
# |DEST_COUNTRY_NAME|
# +-----------------+
# | United States|
# | United States|
# +-----------------+
select 메서드에 expr 함수를 사용하는 패턴이 많아져서 스파크는 이런 작업을 효율적으로 할수 있는 selectExpr 메서드를 제공한다.
df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)
# 결과
# +-------------+-----------------+
# |newColumnName|DEST_COUNTRY_NAME|
# +-------------+-----------------+
# |United States| United States|
# |United States| United States|
# +-------------+-----------------+
df.selectExpr(
"*", # all original columns
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
.show(2)
# 결과
# +-----------------+-------------------+-----+-------------+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
# +-----------------+-------------------+-----+-------------+
# | United States| Romania| 15| false|
# | United States| Croatia| 1| false|
# +-----------------+-------------------+-----+-------------+
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)
# 결과
# [Stage 8:====================================================> (191 + 1) / 200]+-----------+---------------------------------+
# | avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
# +-----------+---------------------------------+
# |1770.765625| 132|
# +-----------+---------------------------------+
distinct 메서드를 이용하여 중복을 제거하고 고유한 로우를 얻을 수도 있다.
# ORIGIN_COUNTRY_NAME 컬럼의 로우에 대한 중복을 제거 하고 데이터 및 개수 조회
df5 = df.select("ORIGIN_COUNTRY_NAME").distinct()
df5.show(5)
print(df.count())
# 결과
# +-------------------+
# |ORIGIN_COUNTRY_NAME|
# +-------------------+
# | Paraguay|
# | Russia|
# | Anguilla|
# | Senegal|
# | Sweden|
# +-------------------+
# only showing top 5 rows
# 256
sort 와 orderBy 메서드를 사용하여 정렬할 수 있다 .
기본 동작은 asc(오름차순)이다.
# sort 메소드를 사용하여 count 컬럼 정렬 5 로우 조회
df.sort("count").show(5)
# 결과
# +--------------------+-------------------+-----+
# | DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
# +--------------------+-------------------+-----+
# | Malta| United States| 1|
# |Saint Vincent and...| United States| 1|
# | United States| Croatia| 1|
# | United States| Gibraltar| 1|
# | United States| Singapore| 1|
# +--------------------+-------------------+-----+
# orderBy 메소드를 사용하여 count, DEST_COUNTRY_NAME 정렬 5 로우 조회
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
# 결과
# +-----------------+-------------------+-----+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
# +-----------------+-------------------+-----+
# | Burkina Faso| United States| 1|
# | Cote d'Ivoire| United States| 1|
# | Cyprus| United States| 1|
# | Djibouti| United States| 1|
# | Indonesia| United States| 1|
# +-----------------+-------------------+-----+
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)
# 결과
# +-----------------+-------------------+-----+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
# +-----------------+-------------------+-----+
# | Burkina Faso| United States| 1|
# | Cote d'Ivoire| United States| 1|
# | Cyprus| United States| 1|
# | Djibouti| United States| 1|
# | Indonesia| United States| 1|
# +-----------------+-------------------+-----+
from pyspark.sql.functions import desc, asc
# orderBy 메소드를 사용하여 count 내림차순 2 로우 조회
df.orderBy(expr("count desc")).show(2)
# 결과
# +-----------------+-------------------+-----+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
# +-----------------+-------------------+-----+
# | Moldova| United States| 1|
# | United States| Croatia| 1|
# +-----------------+-------------------+-----+
# orderBy 메소드를 사용하여 count 내림차순, DEST_COUNTRY_NAME 오름차순 2 로우 조회
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)
# 결과
# +-----------------+-------------------+------+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
# +-----------------+-------------------+------+
# | United States| United States|370002|
# | United States| Canada| 8483|
# +-----------------+-------------------+------+
컬럼 추가와 컬럼명 변경
컬럼 추가는 DataFrame의 withColumn 메서드를 이용한다.
# numberOne이라는 컬럼을 추가
from pyspark.sql.functions import lit
df.withColumn("numberOne", lit(1)).show(2)
# 결과
# +-----------------+-------------------+-----+---------+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
# +-----------------+-------------------+-----+---------+
# | United States| Romania| 15| 1|
# | United States| Croatia| 1| 1|
# +-----------------+-------------------+-----+---------+
# expr 조건을 통하여 boolean 타입으로 withinCountry 컬럼을 추가
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))\
.show(2)
# 결과
# +-----------------+-------------------+-----+-------------+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
# +-----------------+-------------------+-----+-------------+
# | United States| Romania| 15| false|
# | United States| Croatia| 1| false|
# +-----------------+-------------------+-----+-------------+
컬럼명 변경은 withColumnRenamed 메서드를 이용한다.
# DEST_COUNTRY_NAME 컬럼을 dest로 변경
df.show(2)
df1 = df.withColumnRenamed("DEST_COUNTRY_NAME", "dest")
df1.show(2)
# 결과
# +-----------------+-------------------+-----+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
# +-----------------+-------------------+-----+
# | United States| Romania| 15|
# | United States| Croatia| 1|
# +-----------------+-------------------+-----+
# +-------------+-------------------+-----+
# | dest|ORIGIN_COUNTRY_NAME|count|
# +-------------+-------------------+-----+
# |United States| Romania| 15|
# |United States| Croatia| 1|
# +-------------+-------------------+-----+
대소문자 구분
스파크는 기본적으로 대소문자를 구분하지 않는다.
대소문자를 구분하게 만드려면 아래의 3가지 방법 중 하나의 방법에 대한 코드을 추가한다.
# 1. SparkSession을 생성할 때 config 옵션에 넣는다.
spark = SparkSession.builder \
.master("local") \
.appName("testapp") \
.config("spark.some.config.option", "some-value") \
.config("spark.sql.caseSensitive", "true") \
.getOrCreate()
# 2. spark.conf.set 함수를 이용
spark.conf.set("spark.sql.caseSensitive", "true")
# 3. spark.sql 함수 이용
spark.sql("SET spark.sql.caseSensitive=true")
컬럼 제거
select 메서드로 컬럼을 제거할 수 있지만 drop 메서드를 사용할 수도 있다.
# ORIGIN_COUNTRY_NAME 컬럼 제거
df2 = df.drop("ORIGIN_COUNTRY_NAME")
df2.show(2)
# 결과
# +-----------------+-----+
# |DEST_COUNTRY_NAME|count|
# +-----------------+-----+
# | United States| 15|
# | United States| 1|
# +-----------------+-----+
# ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME 2 개의 컬럼 제거
df3 = df.drop("ORIGIN_COUNTRY_NAME", ", DEST_COUNTRY_NAME")
df3.show(2)
# 결과
# +-----+
# |count|
# +-----+
# | 15|
# | 1|
# +-----+
컬럼 타입 변경
특정 컬럼의 데이터 타입을 다른 데이터 타입으로 형변환할 경우가 있다.
cast 메서드를 이용하여 데이터 타입을 변환할 수 있다.
# LongType의 count 컬럼을 StringType의 count2 컬럼으로 변경
print(df.schema) # 기존 스키마 조회
df4 = df.withColumn("count2", col("count").cast("string"))
df4.show(2)
print(df4.schema) # 변경된 스키마 조회
# 결과
# StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))
# +-----------------+-------------------+-----+------+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|count2|
# +-----------------+-------------------+-----+------+
# | United States| Romania| 15| 15|
# | United States| Croatia| 1| 1|
# +-----------------+-------------------+-----+------+
# only showing top 2 rows
# StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true),StructField(count2,StringType,true)))
로우 필터링
로우를 필터링 하려면 참과 거짓을 판별하는 표현식을 만들어야 한다. 그래서 표현식의 결과가 false인 로우를 걸러내면 된다.
where 메서드와 filter 메서드를 이용한다.
# count가 2 이상인 로우 2개 조회
df.filter(col("count") < 2 ).show(2)
# 결과
# +-----------------+-------------------+-----+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
# +-----------------+-------------------+-----+
# | United States| Croatia| 1|
# | United States| Singapore| 1|
# +-----------------+-------------------+-----+
df.where("count < 2").show(2)
# 결과
# +-----------------+-------------------+-----+
# |DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
# +-----------------+-------------------+-----+
# | United States| Croatia| 1|
# | United States| Singapore| 1|
# +-----------------+-------------------+-----+
마치며
구조적 API Dataframd의 기본적인 연산을 확인해 보았다.
다름에는 다양한 데이터 타입에 대해서 알아본다.