Kimuksung
Kimuksung 안녕하세요. 분산처리에 관심이 많은 생각하는 주니어 Data Enginner입니다.

Spark RDD

Spark RDD

Spark는 흔히 Map-Reduce를 Memory에 올리는 장점으로 Hadoop → Spark로 변화해왔다.

왜 Memory에 올리며 그 과정에서 생성되는 RDD란 무엇일까?


Hadoop Map-Reduce

  • 데이터 셋을 분산해서 읽고 다시 Diskwrite 한다.
  • Disk에 저장하게 되면, 기본적으로 I/O 연산이 필요하고 이 과정이 큰 코스트이다.
  • 아마 대부분의 시간이 I/O에서 진행되지 않을까.. 추측

hadoop-mapreduce


Spark Map-Reduce

  • Spark는 이런 문제점을 인식하고 메모리에 올려 처리하는 방식을 처리하였다. ( In-memoery )
  • I/O 처리 과정도 필요 없고 빠르게 분산처리가 가능하다.
  • 다만, 메모리의 문제점이 무엇인가? 휘발성이다.
  • 그래서 스파크는 메모리에 올린 값들은 무조건 read-only로 사용한다.
  • 대신 비용은 비싸다.

spark-mapreduce


스파크 데이터 구조 - RDD

  • RDD = Resilient Distributed Dataset
  • Spark의 기본 데이터 구조
  • Resilient : 메모리에서 날라가도 재연산을 통해 다시 시도
  • distributed : 여러 메모리에 분산 처리
  • RDD = Read-only = Immutable 한 특징 ( 메모리에서 처리하기 위함 )
  • Lineage = RDD의 연산 순서로 Airflow Dag와 동일한 형태
  • Fault-tolerant = RDD를 다시 재실행하면 동일한 결과를 얻을 수 있다 → 실패해도 다른 노드에서 실행하여 처리 ( 멱등성과 같은 개념 같다 )

  • 그럼 결국 여러 workflow와 같은 구조의 RDD가 발생합니다.
  • 스파크는 이를 컨트롤 하기 위하여 Transformation/Action 연산을 통해 동작하도록 만듭니다.
  • Lazy Evaluaction : Action 연산자를 만나기 전까지는 실제 실행을 하지 않습니다.

spark-operation

  • Transformation Operation 함수
    • map
    • filter
    • distinct
    • union
    • intersection
  • Action Operation 함수
    • collect
    • count
    • top
    • reduce
    • countbyvalue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from pyspark import SparkContext

# Init
sc = SparkContext("local", "RDD_Test")

lines = sc.textFile("name.csv")
header = lines.first()
# RDD
filtered_lines = lines.filter(lambda row: row != header)

print(type(header), type(filtered_lines))
print(header)
print(filtered_lines.take(5))

print('-'*20)
# Transformation
stores = filtered_lines.map(lambda row: row.split(',')[1])
# Action
result = stores.countByValue()
print(result)

sc.stop()

2023-11-17-5-06-18


참조