diff --git a/.gitbook/assets/image (1).png b/.gitbook/assets/image (1).png new file mode 100644 index 0000000..6696ca4 Binary files /dev/null and b/.gitbook/assets/image (1).png differ diff --git a/.gitbook/assets/image (2).png b/.gitbook/assets/image (2).png new file mode 100644 index 0000000..7caea59 Binary files /dev/null and b/.gitbook/assets/image (2).png differ diff --git a/.gitbook/assets/image.png b/.gitbook/assets/image.png new file mode 100644 index 0000000..54ad235 Binary files /dev/null and b/.gitbook/assets/image.png differ diff --git a/02-processing/2.1.md b/02-processing/2.1.md index 738d898..4a66d75 100644 --- a/02-processing/2.1.md +++ b/02-processing/2.1.md @@ -2,19 +2,20 @@ 데이터를 처리하기 위한 프레임워크는 다양합니다. 이 글에서는 데이터를 분산처리하기 위해 자주 쓰이는 두 가지 프레임워크에 대해 이야기 할 예정입니다. -* [Apache Spark ](https://spark.apache.org/) -* [Apache Flink](https://flink.apache.org/) +* [Apache Spark ](https://spark.apache.org) +* [Apache Flink](https://flink.apache.org)\ + Apache Spark 는 배치 처리를 위해 많이 활용됩니다. 경우에 따라 스트림 처리를 위해 사용되는 경우도 있습니다. 메타스토어로부터 데이터를 테이블 형태로 쉽게 읽어오고 가공하는 것이 가능하며, 사용자의 편의에 따라 다양한 API 수준을 지원합니다. * [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html#sql) -* [Spark Dataframe / DataSet API](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes) +* [Spark Dataframe / DataSet API](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes)\ + -Spark 를 활용하면, 도메인 로직을 모듈로 만들고 배치 및 스트림에서 애플리케이션 \(이하 Application\) 에서 동일한 로직을 사용할 수 있습니다. 예를 들어 +Spark 를 활용하면, 도메인 로직을 모듈로 만들고 배치 및 스트림에서 애플리케이션 (이하 Application) 에서 동일한 로직을 사용할 수 있습니다. 예를 들어 * Kafka 에서 데이터를 읽어 처리하는 스트림 Application 있을 때 * 장애 등의 이유로 동일한 데이터를 Kafka 가 아닌 S3 등 스토리지에서 읽어 배치처리를 할 경우 * 동일한 모듈을 이용해 가공할 수 있습니다 Flink 또한 배치 및 스트림 처리를 위한 API 를 지원합니다. 다만 이 글에서는 스트림을 위한 내용을 위주로 설명합니다. - diff --git a/02-processing/2.2-batch/2.1.2-spark-architecture.md b/02-processing/2.2-batch/2.1.2-spark-architecture.md index d3b930a..84f9459 100644 --- a/02-processing/2.2-batch/2.1.2-spark-architecture.md +++ b/02-processing/2.2-batch/2.1.2-spark-architecture.md @@ -76,6 +76,215 @@ df.toPandas() # PySpark 에서 사용할 수 있는 함수로, Jupyter 에서 2238 8235 1956 Master Together 69245.0 0 1 24-01-2014 8 428 ... 3 0 0 0 0 0 0 3 11 0 ``` + + +Spark 는 데이터를 테이블 형태로 다룰 수 있도록 API 를 제공합니다. 물리적으로는 여러 머신의 메모리에 분산되어 있더라도, 사용자가 데이터를 마치 하나의 테이블처럼 논리적으로 다룰 수 있습니다. + +우선 데이터를 간단히 조작해보겠습니다. 컬럼 이름을 쉽게 다루기 위해, 몇개만 선택후 이름을 변경하겠습니다. + +``` +from pyspark.sql.functions import * +from pyspark.sql.types import * + +# 컬럼을 선택하고 이름을 변경합니다. +# SQL 의 SELECT 'ID' as id, 'Year_Birth' as 'year_birth'... 과 동일합니다. + +dfSelected = df.select( + col("ID").alias("id"), + col("Year_Birth").alias("year_birth"), + col("Education").alias("education"), + col("Kidhome").alias("count_kid"), + col("Teenhome").alias("count_teen"), + col("Dt_Customer").alias("date_customer"), + col("Recency").alias("days_last_login") +) + +dfSelected.count() +dfSelected.printSchema() +``` + +``` +# df.count() 의 결과 +2240 + +# df.printSchema() +root + |-- id: integer (nullable = true) + |-- year_birth: integer (nullable = true) + |-- education: string (nullable = true) + |-- count_kid: integer (nullable = true) + |-- count_teen: integer (nullable = true) + |-- date_customer: string (nullable = true) + |-- days_last_login: integer (nullable = true) + +``` + +여기서 기억하고 넘어가야 할 부분이 몇 가지 있습니다. + +1. 컬럼 이름을 변경한 결과를 `dfSelected` DataFrame 에 저장했기 때문에, 최초의 `df` DataFrame 은 그대로 존재합니다. 따라서 `df` 와 `dfSelected` 는 다른 DataFrame 입니다. +2. 그러나 DataFrame 은 **논리적인** 테이블이므로 실제 물리적으로 데이터가 복사되어 `df`, `dfSelect `두 벌이 되는 것은 아닙니다. +3. 물리적인 데이터는 여전히 Disk 에 CSV 파일로 존재하고, `count()` `toPandas()` 와 같은 [RDD 액션](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions)을 수행할 때 데이터를 메모리로 읽어 처리 하게됩니다. RDD 와 액션에 대해서는 추후 다른 챕터에서 더 자세히 설명하겠습니다. + +``` +# df.rdd.id() 실행 결과 + + +# dfSelected.rdd.id() 실행 결과 + +``` + +![](<../../.gitbook/assets/image (2).png>)![](<../../.gitbook/assets/image (1).png>) + +위 스크린샷은 [Spark UI](https://spark.apache.org/docs/latest/web-ui.html) 에서 확인할 수 있는 Stage 정보로, `toPandas()` 를 호출하기 까지 실행되는 Spark 연산입니다. + + + +이제 데이터를 로딩해서, 원하는 컬럼만 추출했으니 데이터를 일부 가공해보겠습니다. + +`` + +1. `count_kid` 는 고객의 유아 자녀, `count_teen` 은 고객의 초등학생 이상 자녀를 나타내는 컬럼인데, 이 값을 더한 `count_children` 컬럼을 만들어 보겠습니다. 다만 이 때, 양쪽 또는 한쪽의 값이 NULL 일 수 있으므로 덧셈에 유의해야 합니다. +2. `education` 컬럼은 졸업 학위를 나타냅니다. 이 때 `2n Cycle` 값을 허용하지 않는다는 정책이 세워져, 대신 문자열 `NONE` 값을 넣도록 하겠습니다. 여기서는 IF ELSE 와 유사한 SQL 구문인 CASE WHEN 을 사용해 보겠습니다. +3. `date_customer` 는 최초 가입일 컬럼인데 팀원의 실수로 데이터가 잘못되었다고 가정하고 7년을 더해보겠습니다. 예를 들어 `2013-01-01` 이면 변경 후에는 `2020-01-01` 이 되어야 합니다. + +Spark DataFrame API 는 SQL 에 대응되는 함수가 대부분 존재하므로 어떻게 다룰지 모르더라도 SQL 함수를 기반으로 생각해보고 구글링을 통해 해결할 수 있습니다. + +우선 시작 전에 통계 정보와 스키마를 다시 살펴 보면, + +``` +dfSelected.printSchema() # 스키마를 확인합니다. +dfSelected.describe().show() # 통계 정보를 확인합니다. PySpark 에서는 `show` 대신 `toPandas` 를 활용할 수 있습니다. + +# printSchema() 의 출력 결과 +root + |-- id: integer (nullable = true) + |-- year_birth: integer (nullable = true) + |-- education: string (nullable = true) + |-- count_kid: integer (nullable = true) + |-- count_kid: integer (nullable = true) + |-- date_customer: string (nullable = true) + |-- days_last_login: integer (nullable = true) + +# describe().show() 의 출력 결과 ++-------+------------------+------------------+---------+-------------------+------------------+-------------+-----------------+ +|summary| id| year_birth|education| count_kid| count_kid|date_customer| days_last_login| ++-------+------------------+------------------+---------+-------------------+------------------+-------------+-----------------+ +| count| 2240| 2240| 2240| 2240| 2240| 2240| 2240| +| mean| 5592.159821428571|1968.8058035714287| null|0.44419642857142855| 0.50625| null| 49.109375| +| stddev|3246.6621975643416|11.984069456885827| null| 0.5383980977345935|0.5445382307698761| null|28.96245280837821| +| min| 0| 1893| 2n Cycle| 0| 0| 01-01-2013| 0| +| max| 11191| 1996| PhD| 2| 2| 31-12-2013| 99| ++-------+------------------+------------------+---------+-------------------+------------------+-------------+-----------------+ +``` + +``` +dfConverted1 = dfSelected\ + withColumn("count_children", coalesce("count_kid", lit(0)) + coalesce("count_teen", lit(0))) + +dfConverted1\ + .select(col("id"), col("count_kid"), col("count_teen"), col("count_children"))\ + .limit(5)\ + .show() + +# `show() 출력 결과 ++----+---------+----------+--------------+ +| id|count_kid|count_teen|count_children| ++----+---------+----------+--------------+ +|5524| 0| 0| 0| +|2174| 1| 1| 2| +|4141| 0| 0| 0| +|6182| 1| 0| 1| +|5324| 1| 0| 1| ++----+---------+----------+--------------+ + +``` + +`count_kid` 또는 `count_teen` 컬럼에 미래에는 NULL 값이 들어올 수도 있으므로 `coalesce("count_kid", lit(0))` 와 같이 기본값을 0 으로 세팅합니다. 그리고 두 컬럼의 값을 더해 `count_children` 컬럼을 만듭니다. + +이후에는 결과 DataFrame 인 `dfConverted1` 에서 보기 쉽게 원하는 컬럼만 선택해서 5개만 추출해 데이터를 확인해봅니다. 아까 언급했던 바와 같이 Spark DataFrame 은 Immutable 하므로 (더 엄밀히는 DataFrame 을 구성하는 RDD) `dfConverted1.select` 구문은 그 위에서 만든 `dfConverted1`에 영향을 미치지 않습니다. + + + +그 다음으로는 `education` 컬럼을 살펴보고 변경해보겠습니다. + +``` +# 이 작업에서는 컬럼 이름을 가공하지 않으므로, `select` 내에서 `col` 함수를 사용하지 않았습니다. +# Spark 는 이와 같이 API 에서 다양한 형태로 사용자의 편의성을 지원합니다. +dfConverted1\ + .select("education")\ + .distinct()\ + .show() + +# `show()` 출력 결과 ++----------+ +| education| ++----------+ +| 2n Cycle| +| PhD| +| Master| +|Graduation| +| Basic| ++----------+ +``` + +앞서 언급한 것과 같이 `2n Cycle` 값은 CASE WHEN 구문을 사용해 `NONE` 으로 변경하도록 하겠습니다. + +``` +educationInvalid = '2n Cycle' +educationDefault = 'NONE' + +# 다음 SQL 구문과 동일합니다. +# +# SELECT CASE WHEN education = '2n Cycle' THEN 'NONE' ELSE education as education +# + +dfConverted2 = dfConverted1.withColumn( + "education", + when(col("education") == lit(educationInvalid), educationDefault).otherwise(col("education")) +) +``` + +{% hint style="warning" %} +오늘 작업한 `education` 컬럼에는 NULL 값이 없었지만, 미래에는 들어올지 모릅니다. 만약 정책적으로 지정한 값 이외에 허용되지 않는 데이터를 전부 `NONE` 으로 세팅하려면 코드를 어떻게 변경해야 할까요? + +PySpark 의 API 문서에서 isin 함수를 살펴보고, 이것을 통해 문제를 해결할 수 있을지 고민해 봅니다. + +* [https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.Column.isin.html](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.Column.isin.html) +{% endhint %} + + + +Spark DataFrame API 는 `selectExpr` 이란 함수를 제공합니다. SQL 문법을 사용할 수 있습니다. 예를 들어, 위와 동일한 작업을 할 때 다음처럼 코드를 작성할 수 있습니다. + +``` +// Some code +dfConverted3 = dfConverted1\ + .selectExpr("*", + f"CASE WHEN education == '{educationInvalid}' THEN '{educationDefault}' ELSE education END as education" + ) +``` + + + + + +이쯤에서 분산처리는 어디서 하고 실시간 처리는 어떻게 하냐는 질문이 생기실 수 있습니다. + + + +위의 예제에서는 1개의 CSV 파일 내에 2240 개의 Row 밖에 없었지만, 데이터가 매우 크다고 가정하면 하나의 파일로 만들 수 없어, 일반적으로 파일을 분할해 S3 나 HDFS 등에 보관하게 됩니다. Spark 는 데이터 가공시에 단일 또는 여러 파일들을 메모리에읽어 처리하게 됩니다. 추후에 다시 설명하겠지만 Spark 를 로컬 모드 (단일 머신) 모드로 사용한다면 하나의 머신에서 처리가 되고, 여러 + + + + + +INFO + +* 데이터현재의 데이터를 가지고 작업하지만, 미래의 데이터를 가정하고 코드를 방어하기. 물론 시간이 허락하는 한도 내에서 +* selectExpr + + + RDD Partition