Skip to content

Commit

Permalink
GitBook: [#17] WIP: Spark Concept
Browse files Browse the repository at this point in the history
  • Loading branch information
1ambda authored and gitbook-bot committed Oct 13, 2021
1 parent f154b5c commit 3589618
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 5 deletions.
Binary file added .gitbook/assets/image (1).png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added .gitbook/assets/image (2).png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added .gitbook/assets/image.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 6 additions & 5 deletions 02-processing/2.1.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@

데이터를 처리하기 위한 프레임워크는 다양합니다. 이 글에서는 데이터를 분산처리하기 위해 자주 쓰이는 두 가지 프레임워크에 대해 이야기 할 예정입니다.

* [Apache Spark ](https://spark.apache.org/)
* [Apache Flink](https://flink.apache.org/)
* [Apache Spark ](https://spark.apache.org)
* [Apache Flink](https://flink.apache.org)\


Apache Spark 는 배치 처리를 위해 많이 활용됩니다. 경우에 따라 스트림 처리를 위해 사용되는 경우도 있습니다. 메타스토어로부터 데이터를 테이블 형태로 쉽게 읽어오고 가공하는 것이 가능하며, 사용자의 편의에 따라 다양한 API 수준을 지원합니다.

* [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html#sql)
* [Spark Dataframe / DataSet API](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes)
* [Spark Dataframe / DataSet API](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes)\


Spark 를 활용하면, 도메인 로직을 모듈로 만들고 배치 및 스트림에서 애플리케이션 \(이하 Application\) 에서 동일한 로직을 사용할 수 있습니다. 예를 들어
Spark 를 활용하면, 도메인 로직을 모듈로 만들고 배치 및 스트림에서 애플리케이션 (이하 Application) 에서 동일한 로직을 사용할 수 있습니다. 예를 들어

* Kafka 에서 데이터를 읽어 처리하는 스트림 Application 있을 때
* 장애 등의 이유로 동일한 데이터를 Kafka 가 아닌 S3 등 스토리지에서 읽어 배치처리를 할 경우
* 동일한 모듈을 이용해 가공할 수 있습니다

Flink 또한 배치 및 스트림 처리를 위한 API 를 지원합니다. 다만 이 글에서는 스트림을 위한 내용을 위주로 설명합니다.

209 changes: 209 additions & 0 deletions 02-processing/2.2-batch/2.1.2-spark-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,215 @@ df.toPandas() # PySpark 에서 사용할 수 있는 함수로, Jupyter 에서
2238 8235 1956 Master Together 69245.0 0 1 24-01-2014 8 428 ... 3 0 0 0 0 0 0 3 11 0
```



Spark 는 데이터를 테이블 형태로 다룰 수 있도록 API 를 제공합니다. 물리적으로는 여러 머신의 메모리에 분산되어 있더라도, 사용자가 데이터를 마치 하나의 테이블처럼 논리적으로 다룰 수 있습니다.

우선 데이터를 간단히 조작해보겠습니다. 컬럼 이름을 쉽게 다루기 위해, 몇개만 선택후 이름을 변경하겠습니다.

```
from pyspark.sql.functions import *
from pyspark.sql.types import *
# 컬럼을 선택하고 이름을 변경합니다.
# SQL 의 SELECT 'ID' as id, 'Year_Birth' as 'year_birth'... 과 동일합니다.
dfSelected = df.select(
col("ID").alias("id"),
col("Year_Birth").alias("year_birth"),
col("Education").alias("education"),
col("Kidhome").alias("count_kid"),
col("Teenhome").alias("count_teen"),
col("Dt_Customer").alias("date_customer"),
col("Recency").alias("days_last_login")
)
dfSelected.count()
dfSelected.printSchema()
```

```
# df.count() 의 결과
2240
# df.printSchema()
root
|-- id: integer (nullable = true)
|-- year_birth: integer (nullable = true)
|-- education: string (nullable = true)
|-- count_kid: integer (nullable = true)
|-- count_teen: integer (nullable = true)
|-- date_customer: string (nullable = true)
|-- days_last_login: integer (nullable = true)
```

여기서 기억하고 넘어가야 할 부분이 몇 가지 있습니다.

1. 컬럼 이름을 변경한 결과를 `dfSelected` DataFrame 에 저장했기 때문에, 최초의 `df` DataFrame 은 그대로 존재합니다. 따라서 `df``dfSelected` 는 다른 DataFrame 입니다.
2. 그러나 DataFrame 은 **논리적인** 테이블이므로 실제 물리적으로 데이터가 복사되어 `df`, `dfSelect `두 벌이 되는 것은 아닙니다.
3. 물리적인 데이터는 여전히 Disk 에 CSV 파일로 존재하고, `count()` `toPandas()` 와 같은 [RDD 액션](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions)을 수행할 때 데이터를 메모리로 읽어 처리 하게됩니다. RDD 와 액션에 대해서는 추후 다른 챕터에서 더 자세히 설명하겠습니다.

```
# df.rdd.id() 실행 결과
<bound method RDD.id of MapPartitionsRDD[25] at javaToPython at NativeMethodAccessorImpl.java:0>
# dfSelected.rdd.id() 실행 결과
<bound method RDD.id of MapPartitionsRDD[31] at javaToPython at NativeMethodAccessorImpl.java:0>
```

![](<../../.gitbook/assets/image (2).png>)![](<../../.gitbook/assets/image (1).png>)

위 스크린샷은 [Spark UI](https://spark.apache.org/docs/latest/web-ui.html) 에서 확인할 수 있는 Stage 정보로, `toPandas()` 를 호출하기 까지 실행되는 Spark 연산입니다.



이제 데이터를 로딩해서, 원하는 컬럼만 추출했으니 데이터를 일부 가공해보겠습니다.

``

1. `count_kid` 는 고객의 유아 자녀, `count_teen` 은 고객의 초등학생 이상 자녀를 나타내는 컬럼인데, 이 값을 더한 `count_children` 컬럼을 만들어 보겠습니다. 다만 이 때, 양쪽 또는 한쪽의 값이 NULL 일 수 있으므로 덧셈에 유의해야 합니다.
2. `education` 컬럼은 졸업 학위를 나타냅니다. 이 때 `2n Cycle` 값을 허용하지 않는다는 정책이 세워져, 대신 문자열 `NONE` 값을 넣도록 하겠습니다. 여기서는 IF ELSE 와 유사한 SQL 구문인 CASE WHEN 을 사용해 보겠습니다.
3. `date_customer` 는 최초 가입일 컬럼인데 팀원의 실수로 데이터가 잘못되었다고 가정하고 7년을 더해보겠습니다. 예를 들어 `2013-01-01` 이면 변경 후에는 `2020-01-01` 이 되어야 합니다.

Spark DataFrame API 는 SQL 에 대응되는 함수가 대부분 존재하므로 어떻게 다룰지 모르더라도 SQL 함수를 기반으로 생각해보고 구글링을 통해 해결할 수 있습니다.

우선 시작 전에 통계 정보와 스키마를 다시 살펴 보면,

```
dfSelected.printSchema() # 스키마를 확인합니다.
dfSelected.describe().show() # 통계 정보를 확인합니다. PySpark 에서는 `show` 대신 `toPandas` 를 활용할 수 있습니다.
# printSchema() 의 출력 결과
root
|-- id: integer (nullable = true)
|-- year_birth: integer (nullable = true)
|-- education: string (nullable = true)
|-- count_kid: integer (nullable = true)
|-- count_kid: integer (nullable = true)
|-- date_customer: string (nullable = true)
|-- days_last_login: integer (nullable = true)
# describe().show() 의 출력 결과
+-------+------------------+------------------+---------+-------------------+------------------+-------------+-----------------+
|summary| id| year_birth|education| count_kid| count_kid|date_customer| days_last_login|
+-------+------------------+------------------+---------+-------------------+------------------+-------------+-----------------+
| count| 2240| 2240| 2240| 2240| 2240| 2240| 2240|
| mean| 5592.159821428571|1968.8058035714287| null|0.44419642857142855| 0.50625| null| 49.109375|
| stddev|3246.6621975643416|11.984069456885827| null| 0.5383980977345935|0.5445382307698761| null|28.96245280837821|
| min| 0| 1893| 2n Cycle| 0| 0| 01-01-2013| 0|
| max| 11191| 1996| PhD| 2| 2| 31-12-2013| 99|
+-------+------------------+------------------+---------+-------------------+------------------+-------------+-----------------+
```

```
dfConverted1 = dfSelected\
withColumn("count_children", coalesce("count_kid", lit(0)) + coalesce("count_teen", lit(0)))
dfConverted1\
.select(col("id"), col("count_kid"), col("count_teen"), col("count_children"))\
.limit(5)\
.show()
# `show() 출력 결과
+----+---------+----------+--------------+
| id|count_kid|count_teen|count_children|
+----+---------+----------+--------------+
|5524| 0| 0| 0|
|2174| 1| 1| 2|
|4141| 0| 0| 0|
|6182| 1| 0| 1|
|5324| 1| 0| 1|
+----+---------+----------+--------------+
```

`count_kid` 또는 `count_teen` 컬럼에 미래에는 NULL 값이 들어올 수도 있으므로 `coalesce("count_kid", lit(0))` 와 같이 기본값을 0 으로 세팅합니다. 그리고 두 컬럼의 값을 더해 `count_children` 컬럼을 만듭니다.

이후에는 결과 DataFrame 인 `dfConverted1` 에서 보기 쉽게 원하는 컬럼만 선택해서 5개만 추출해 데이터를 확인해봅니다. 아까 언급했던 바와 같이 Spark DataFrame 은 Immutable 하므로 (더 엄밀히는 DataFrame 을 구성하는 RDD) `dfConverted1.select` 구문은 그 위에서 만든 `dfConverted1`에 영향을 미치지 않습니다.



그 다음으로는 `education` 컬럼을 살펴보고 변경해보겠습니다.

```
# 이 작업에서는 컬럼 이름을 가공하지 않으므로, `select` 내에서 `col` 함수를 사용하지 않았습니다.
# Spark 는 이와 같이 API 에서 다양한 형태로 사용자의 편의성을 지원합니다.
dfConverted1\
.select("education")\
.distinct()\
.show()
# `show()` 출력 결과
+----------+
| education|
+----------+
| 2n Cycle|
| PhD|
| Master|
|Graduation|
| Basic|
+----------+
```

앞서 언급한 것과 같이 `2n Cycle` 값은 CASE WHEN 구문을 사용해 `NONE` 으로 변경하도록 하겠습니다.

```
educationInvalid = '2n Cycle'
educationDefault = 'NONE'
# 다음 SQL 구문과 동일합니다.
#
# SELECT CASE WHEN education = '2n Cycle' THEN 'NONE' ELSE education as education
#
dfConverted2 = dfConverted1.withColumn(
"education",
when(col("education") == lit(educationInvalid), educationDefault).otherwise(col("education"))
)
```

{% hint style="warning" %}
오늘 작업한 `education` 컬럼에는 NULL 값이 없었지만, 미래에는 들어올지 모릅니다. 만약 정책적으로 지정한 값 이외에 허용되지 않는 데이터를 전부 `NONE` 으로 세팅하려면 코드를 어떻게 변경해야 할까요?

PySpark 의 API 문서에서 isin 함수를 살펴보고, 이것을 통해 문제를 해결할 수 있을지 고민해 봅니다.

* [https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.Column.isin.html](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.Column.isin.html)
{% endhint %}



Spark DataFrame API 는 `selectExpr` 이란 함수를 제공합니다. SQL 문법을 사용할 수 있습니다. 예를 들어, 위와 동일한 작업을 할 때 다음처럼 코드를 작성할 수 있습니다.

```
// Some code
dfConverted3 = dfConverted1\
.selectExpr("*",
f"CASE WHEN education == '{educationInvalid}' THEN '{educationDefault}' ELSE education END as education"
)
```





이쯤에서 분산처리는 어디서 하고 실시간 처리는 어떻게 하냐는 질문이 생기실 수 있습니다.



위의 예제에서는 1개의 CSV 파일 내에 2240 개의 Row 밖에 없었지만, 데이터가 매우 크다고 가정하면 하나의 파일로 만들 수 없어, 일반적으로 파일을 분할해 S3 나 HDFS 등에 보관하게 됩니다. Spark 는 데이터 가공시에 단일 또는 여러 파일들을 메모리에읽어 처리하게 됩니다. 추후에 다시 설명하겠지만 Spark 를 로컬 모드 (단일 머신) 모드로 사용한다면 하나의 머신에서 처리가 되고, 여러





INFO

* 데이터현재의 데이터를 가지고 작업하지만, 미래의 데이터를 가정하고 코드를 방어하기. 물론 시간이 허락하는 한도 내에서
* selectExpr



RDD

Partition
Expand Down

0 comments on commit 3589618

Please sign in to comment.