2. Spark Architecture
Spark Architecture에 대해 알아보겠습니다.
Spark란?
- Spark - 자동차의 엔진과 같다
- Programming Launguage - 자동차의 핸들
Why Spark?
-
속도가 매우 빠르다. 그러면 과거에는 무엇을 어떻게 사용했을까요?
- Google File System으로 부터 시작합니다.
- Google File System은
Failure Tolerance
라는 개념을 가지고 나왔는데, Master-Slave 구조를 통해Slave가 일을 제대로 수행 못하면 다시 일을 시켜
버립니다.
- Google File System은
- Map-Reduce 개념 발생
- Hadoop이 Map-Reduce라는 데이터 Block으로 나누어 처리하는 개념을 통하여 Google File System을 효율적으로 확장시켰습니다.
- I/O 속도도 느리다
- Hadoop은 map-reduce-shuffle 시 HDFS에 저장해야합니다. 이로 인해 I/O가 발생하죠. 반면에 Spark는
In-Memory
에 두고 처리하는 방식입니다.
- Hadoop은 map-reduce-shuffle 시 HDFS에 저장해야합니다. 이로 인해 I/O가 발생하죠. 반면에 Spark는
좋은 비유가 있어 추가 설명 드리자면
- 병렬처리 = 요리 과정
- Spark = 요리 재료가 요리도구 위에 계속하여 존재
- Hadoop = 요리 도구 위에 요리 재료를 넣어주어야 한다.
Cluster
- 여러 컴퓨터 Resource를 하나의 컴퓨터처럼 사용
- 클러스터를 구성한다고 하여 사용할 수 있는 것이 아니라 프레임워크가 있어야 Control이 가능
- Spark는 컨트롤을 도와주는 역할의 프레임워크
- 스파크는 Local Mode도 지원하는데, Driver와 Executor를 단일 머신에서 Thread처럼 실행
Spark Application
- Spark Application = Driver Process + 다수의 Executor Process 구성
- Driver Process → Cluster Node → Execute Main() - 사용자 프로그램 Input,Scheduling, 분석, 유지관리 등등을 실행
- Driver Process → Executor Process → 할당한 작업을 수행
- Driver Process가 할당한 코드를 실행 및 진행상황을 Driver Node에 보고
Driver Process
- Master Node
- Driver
- 명령을 내리는 여왕벌
- Executor에게 구체적인 명령어를 지시(어떤 방식?- RDD,Dataframe / 몇명이서?-executor 수 .. )
- spark context는 Driver Process의 일종으로 RDD만 처리
- SparkSession는 Driver Process의 일종으로 RDD외에 Dataframe, Dataset 처리
- Spark session에서 여러 언어에서 실행할 수 있는 코드로 변환
- Spark session = Spark Application을 관리
Executor
- Slave Node = Executor
- Executor
- 일하는 일꾼
- Driver Processor가 시킨 일을 수행 후 Driver Processor에게 보고
Cluster Manager
- 사용 가능한 Resource를 파악하기 위함
- Cluster Manager - spark standalone cluster manager,hadoop,yarn,메소스
파티션
- Executor들이 병렬로 처리 가능하도록 데이터를 Chunk 단위로 분할.
- Partition의 수,Cluster node 수가 너무 적게 되면 병렬 처리 성능이 낮아진다.
Transformation
- 데이터 구조 ⇒ 불변성( Spark 핵심 구조로 한번 생성하면 변경이 불가 )
- 데이터 변경하기 위해 원하는 변경 방법을 알려주는 것이다.
논리적인 실행 계획을 세우는 과정
- 실제 연산 X- Spark는 Dataframe Pipelining(여러 필터)은 Memory에서 처리.
- Shuffle = 클러스터에서 파티션을 교환
- Shuffle은 Dataframe 결과를 Disk에 저장
- E.g) where, sum, withColumnRenamed, ..
1
2
3
4
5
# scala code
val divisBy2 = df.where("number%2=0")
# Python code
divisBy2 = df.where("number%2=0")
Narrow Dependency
- 자식 RDD의 각 파티션이 부모 RDD의 파티션들에 대해 단순하고 한정적인 종속성을 가진 경우
- 3개의 그룹이 존재하고 각 그룹에서 키가 170보다 큰 사람을 구한다고 한다면, 그룹 별로 해당 인원들을 차출하여 보여주면 3개의 그룹이 3개의 그룹으로 나누어져 1:1 매칭이 됩니다. ( Filter )
- Pipeline 처리시 자동으로 수행
- Dataframe 여러 필터를 지정한 경우
- E.g) where / filter
Wide Dependency
- 임의의 데이터만 Transform이 불가한 경우
- 위와 달리 3개의 그룹에서 키가 170 이상인 사람 수를 묻는다고 한다면, 전체 그룹을 하나로 묶어 계산하여야 합니다. 3개의 그룹은 하나의 그룹 조건에 따라 여러 그룹이 되어 1:N구조가 됩니다.
- 키 값에 따라 Parition된 데이터를 요구하는 경우
- shuffle = cluster에서 Partition 교환 ( 셔플의 결과를 Disk에 저장 )
- E.g) Sort / Reducebykey / groupbykey / join / repartition
Lazy Evaluation
- Spark가 연산 그래프를 처리하기 직전까지 기다리는 동작 방식
- Driver는 Executor들에게 시킬 일을 제한없이 쌓아둘 수 있습니다. → 시킬 일을 할당 받지 않는한 Executor들은 가만히 기다린다.
- 특정 연산 명령이 내려지면, 즉시 수정하는 것이 아닌
실행 계획
을 생성 Dataframe 혹은 Original Data에 Action이 호출되기 전까지는 데이터를 읽지 않는다!
- Dataframe Transformation을 물리적 실행 계획으로 컴파일
- E.g) Dataframe pushdown
- 복잡한 Spark Job이 Origin Data에서 1Row만 선택하는 필터링이라면 1개만 읽는것이 당연 제일 효율적
- 필터 계산을 DB에 위임 → Spark는 하나의 Record만 받음
Action
실제 연산을 수행
- 시킬 일을 Executor에게 할당해주는 과정
- Transformation 논리적 실행 계획 → Action으로 실제 연산 수행
- Action을 지정하면, Spark Job이 시작
- 기본적으로
200개의 셔플 파티션
을 생성 - Spark Job-Filter(narrow transformation) → Count(wide transformation)
- Driver → Job → Stage → Task
- shuffle이 발생하는 지점을 기준으로 Spark Job을 여러 Stage로 나눈다.
- Partitioin 수에 따라 Task 생성 ( task = partition ) → Executor에 분산
1
2
%spark.pyspark
divisBy2.count()
Spark UI
- Spark job 상태, 환경 설정, 클러스터 등을 확인
- http://localhost:4040
- http://localhost:8080
직접 해보기
-
Csv → Datafrmae → Array
1 2
$ cd /Users/wclub/spark/Spark-The-Definitive-Guide/data/flight-data/csv $ docker cp data.csv <container_id>:/tmp/data.csv
1 2 3 4 5 6 7 8 9
%spark.pyspark flightData2015 = spark.read.option("inferSchema","true").option("header","true").csv("/Users/wclub/spark/Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary.csv") flightData2015.take(3) flightData2015.sort("count").explain() # partition 수 설정 spark.conf.set("spark.sql.shuffle.partitions","5") flightData2015.sort("count").take(2)
-
Dataframe → Sql View
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
%spark.pyspark # create view flightData2015.createOrReplaceTempView("flight_data_2015") # sql sqlWay = spark.sql(""" select DEST_COUNTRY_NAME,count(1) from flight_data_2015 group by dest_country_name """) sqlWay.explain() # Dataframe dataFrameWay = flightData2015.groupby("DEST_COUNTRY_NAME").count() dataFrameWay.explain()
-
Dataframe function
- max
1 2 3 4 5
spark.sql("select max(count) from flight_data_2015").take(1) %spark.pyspark from pyspark.sql.functions import max flightData2015.select(max("count")).take(1)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
# sql %spark.pyspark maxsql = spark.sql(""" select dest_country_name,sum(count) as destination_total from flight_data_2015 group by dest_country_name order by destination_total desc limit 5 """) maxsql.show() # dataframe # 마지막 함수가 다 끝나고 나서야 Action 호출 from pyspark.sql.functions import desc flightData2015.groupby("dest_country_name").sum("count").withColumnRenamed("sum(count)","destination_total").sort(desc("destination_total")).limit(5).show() flightData2015.groupby("dest_country_name").sum("count").withColumnRenamed("sum(count)","destination_total").sort(desc("destination_total")).limit(5).explain() >== Physical Plan == TakeOrderedAndProject(limit=5, orderBy=[destination_total#230L DESC NULLS LAST], output=[dest_country_name#10,destination_total#230L]) +- *(2) HashAggregate(keys=[dest_country_name#10], functions=[sum(cast(count#12 as bigint))]) +- Exchange hashpartitioning(dest_country_name#10, 5) +- *(1) HashAggregate(keys=[dest_country_name#10], functions=[partial_sum(cast(count#12 as bigint))]) +- *(1) FileScan csv [DEST_COUNTRY_NAME#10,count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/data.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>
참조
- https://towardsdatascience.com/spark-jargon-for-babies-and-python-programmers-5ccba2c60f68
- https://dev.to/rcmaples/explain-it-to-me-like-im-five-map-reduce–filter-edition-2mef/comments
- https://medium.com/swlh/apache-spark-is-fun-eadcaf141c02