Spark RDD
Spark는 흔히 Map-Reduce를 Memory에 올리는 장점으로 Hadoop → Spark로 변화해왔다.
왜 Memory에 올리며 그 과정에서 생성되는 RDD란 무엇일까?
Hadoop Map-Reduce
- 데이터 셋을 분산해서 읽고 다시
Disk
에write
한다. - Disk에 저장하게 되면, 기본적으로
I/O 연산
이 필요하고 이 과정이 큰 코스트이다. - 아마 대부분의 시간이 I/O에서 진행되지 않을까.. 추측
Spark Map-Reduce
- Spark는 이런 문제점을 인식하고 메모리에 올려 처리하는 방식을 처리하였다. ( In-memoery )
- I/O 처리 과정도 필요 없고 빠르게 분산처리가 가능하다.
- 다만, 메모리의 문제점이 무엇인가? 휘발성이다.
- 그래서 스파크는 메모리에 올린 값들은 무조건
read-only
로 사용한다. - 대신 비용은 비싸다.
스파크 데이터 구조 - RDD
- RDD = Resilient Distributed Dataset
- Spark의 기본 데이터 구조
- Resilient : 메모리에서 날라가도 재연산을 통해 다시 시도
- distributed : 여러 메모리에 분산 처리
- RDD = Read-only = Immutable 한 특징 ( 메모리에서 처리하기 위함 )
- Lineage = RDD의 연산 순서로 Airflow Dag와 동일한 형태
-
Fault-tolerant = RDD를 다시 재실행하면 동일한 결과를 얻을 수 있다 → 실패해도 다른 노드에서 실행하여 처리 ( 멱등성과 같은 개념 같다 )
- 그럼 결국 여러 workflow와 같은 구조의 RDD가 발생합니다.
- 스파크는 이를 컨트롤 하기 위하여
Transformation
/Action
연산을 통해 동작하도록 만듭니다. - Lazy Evaluaction : Action 연산자를 만나기 전까지는 실제 실행을 하지 않습니다.
- Transformation Operation 함수
- map
- filter
- distinct
- union
- intersection
- Action Operation 함수
- collect
- count
- top
- reduce
- countbyvalue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from pyspark import SparkContext
# Init
sc = SparkContext("local", "RDD_Test")
lines = sc.textFile("name.csv")
header = lines.first()
# RDD
filtered_lines = lines.filter(lambda row: row != header)
print(type(header), type(filtered_lines))
print(header)
print(filtered_lines.take(5))
print('-'*20)
# Transformation
stores = filtered_lines.map(lambda row: row.split(',')[1])
# Action
result = stores.countByValue()
print(result)
sc.stop()
참조
- https://www.linkedin.com/pulse/both-iterative-interactive-applications-require-faster-vikas-kumar
- https://medium.com/@RRamya02/apache-spark-resilient-distributed-dataset-rdd-8718ff096005
- https://6mini.github.io/data engineering/2021/12/12/rdd/