Kimuksung
Kimuksung 안녕하세요. 분산처리에 관심이 많은 생각하는 주니어 Data Enginner입니다.

2. Spark Architecture

2. Spark Architecture

Spark Architecture에 대해 알아보겠습니다.

Spark란?

  • Spark - 자동차의 엔진과 같다
  • Programming Launguage - 자동차의 핸들

spark-engine

Why Spark?

  • 속도가 매우 빠르다. 그러면 과거에는 무엇을 어떻게 사용했을까요?

  • Google File System으로 부터 시작합니다.
    • Google File System은 Failure Tolerance라는 개념을 가지고 나왔는데, Master-Slave 구조를 통해 Slave가 일을 제대로 수행 못하면 다시 일을 시켜버립니다.
  • Map-Reduce 개념 발생
    • Hadoop이 Map-Reduce라는 데이터 Block으로 나누어 처리하는 개념을 통하여 Google File System을 효율적으로 확장시켰습니다. map-reduce
  • I/O 속도도 느리다
    • Hadoop은 map-reduce-shuffle 시 HDFS에 저장해야합니다. 이로 인해 I/O가 발생하죠. 반면에 Spark는 In-Memory에 두고 처리하는 방식입니다.

좋은 비유가 있어 추가 설명 드리자면

  • 병렬처리 = 요리 과정
  • Spark = 요리 재료가 요리도구 위에 계속하여 존재
  • Hadoop = 요리 도구 위에 요리 재료를 넣어주어야 한다.


Cluster

  • 여러 컴퓨터 Resource를 하나의 컴퓨터처럼 사용
  • 클러스터를 구성한다고 하여 사용할 수 있는 것이 아니라 프레임워크가 있어야 Control이 가능
  • Spark는 컨트롤을 도와주는 역할의 프레임워크
  • 스파크는 Local Mode도 지원하는데, Driver와 Executor를 단일 머신에서 Thread처럼 실행
Spark Application

  • Spark Application = Driver Process + 다수의 Executor Process 구성
  • Driver Process → Cluster Node → Execute Main() - 사용자 프로그램 Input,Scheduling, 분석, 유지관리 등등을 실행
  • Driver Process → Executor Process → 할당한 작업을 수행
  • Driver Process가 할당한 코드를 실행 및 진행상황을 Driver Node에 보고
Driver Process

  • Master Node
    • Driver
    • 명령을 내리는 여왕벌
  • Executor에게 구체적인 명령어를 지시(어떤 방식?- RDD,Dataframe / 몇명이서?-executor 수 .. )
  • spark context는 Driver Process의 일종으로 RDD만 처리
  • SparkSession는 Driver Process의 일종으로 RDD외에 Dataframe, Dataset 처리
  • Spark session에서 여러 언어에서 실행할 수 있는 코드로 변환
  • Spark session = Spark Application을 관리
Executor
  • Slave Node = Executor
    • Executor
    • 일하는 일꾼
  • Driver Processor가 시킨 일을 수행 후 Driver Processor에게 보고 Driver-Executor


Cluster Manager

  • 사용 가능한 Resource를 파악하기 위함
  • Cluster Manager - spark standalone cluster manager,hadoop,yarn,메소스

파티션

  • Executor들이 병렬로 처리 가능하도록 데이터를 Chunk 단위로 분할.
  • Partition의 수,Cluster node 수가 너무 적게 되면 병렬 처리 성능이 낮아진다.
Transformation

  • 데이터 구조 ⇒ 불변성( Spark 핵심 구조로 한번 생성하면 변경이 불가 )
  • 데이터 변경하기 위해 원하는 변경 방법을 알려주는 것이다.
  • 논리적인 실행 계획을 세우는 과정 - 실제 연산 X
  • Spark는 Dataframe Pipelining(여러 필터)은 Memory에서 처리.
  • Shuffle = 클러스터에서 파티션을 교환
  • Shuffle은 Dataframe 결과를 Disk에 저장
  • E.g) where, sum, withColumnRenamed, ..
1
2
3
4
5
# scala code
val divisBy2 = df.where("number%2=0")

# Python code
divisBy2 = df.where("number%2=0")

Narrow Dependency

  • 자식 RDD의 각 파티션이 부모 RDD의 파티션들에 대해 단순하고 한정적인 종속성을 가진 경우
  • 3개의 그룹이 존재하고 각 그룹에서 키가 170보다 큰 사람을 구한다고 한다면, 그룹 별로 해당 인원들을 차출하여 보여주면 3개의 그룹이 3개의 그룹으로 나누어져 1:1 매칭이 됩니다. ( Filter )
  • Pipeline 처리시 자동으로 수행
  • Dataframe 여러 필터를 지정한 경우
  • E.g) where / filter

Wide Dependency

  • 임의의 데이터만 Transform이 불가한 경우
  • 위와 달리 3개의 그룹에서 키가 170 이상인 사람 수를 묻는다고 한다면, 전체 그룹을 하나로 묶어 계산하여야 합니다. 3개의 그룹은 하나의 그룹 조건에 따라 여러 그룹이 되어 1:N구조가 됩니다.
  • 키 값에 따라 Parition된 데이터를 요구하는 경우
  • shuffle = cluster에서 Partition 교환 ( 셔플의 결과를 Disk에 저장 )
  • E.g) Sort / Reducebykey / groupbykey / join / repartition

https://ifh.cc/g/21tnzv.png

Lazy Evaluation

  • Spark가 연산 그래프를 처리하기 직전까지 기다리는 동작 방식
  • Driver는 Executor들에게 시킬 일을 제한없이 쌓아둘 수 있습니다. → 시킬 일을 할당 받지 않는한 Executor들은 가만히 기다린다.
  • 특정 연산 명령이 내려지면, 즉시 수정하는 것이 아닌 실행 계획을 생성
  • Dataframe 혹은 Original Data에 Action이 호출되기 전까지는 데이터를 읽지 않는다!
  • Dataframe Transformation을 물리적 실행 계획으로 컴파일
  • E.g) Dataframe pushdown
    • 복잡한 Spark Job이 Origin Data에서 1Row만 선택하는 필터링이라면 1개만 읽는것이 당연 제일 효율적
    • 필터 계산을 DB에 위임 → Spark는 하나의 Record만 받음
Action

  • 실제 연산을 수행
  • 시킬 일을 Executor에게 할당해주는 과정
  • Transformation 논리적 실행 계획 → Action으로 실제 연산 수행
  • Action을 지정하면, Spark Job이 시작
  • 기본적으로 200개의 셔플 파티션을 생성
  • Spark Job-Filter(narrow transformation) → Count(wide transformation)
  • Driver → Job → Stage → Task
    • shuffle이 발생하는 지점을 기준으로 Spark Job을 여러 Stage로 나눈다.
    • Partitioin 수에 따라 Task 생성 ( task = partition ) → Executor에 분산
1
2
%spark.pyspark
divisBy2.count()
Spark UI

  • Spark job 상태, 환경 설정, 클러스터 등을 확인
  • http://localhost:4040
  • http://localhost:8080
직접 해보기

  • Csv → Datafrmae → Array

    1
    2
    
      $ cd /Users/wclub/spark/Spark-The-Definitive-Guide/data/flight-data/csv
      $ docker cp data.csv <container_id>:/tmp/data.csv
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
      %spark.pyspark
      flightData2015 = spark.read.option("inferSchema","true").option("header","true").csv("/Users/wclub/spark/Spark-The-Definitive-Guide/data/flight-data/csv/2015-summary.csv")
        
      flightData2015.take(3)
      flightData2015.sort("count").explain()
        
      # partition 수 설정
      spark.conf.set("spark.sql.shuffle.partitions","5")
      flightData2015.sort("count").take(2)
    

    test

  • Dataframe → Sql View

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    
      %spark.pyspark
      # create view
      flightData2015.createOrReplaceTempView("flight_data_2015")
        
      # sql
      sqlWay = spark.sql("""
      select DEST_COUNTRY_NAME,count(1)
      from flight_data_2015
      group by dest_country_name
      """)
        
      sqlWay.explain()
        
      # Dataframe
      dataFrameWay = flightData2015.groupby("DEST_COUNTRY_NAME").count()
      dataFrameWay.explain()
    
  • Dataframe function

    • max
    1
    2
    3
    4
    5
    
      spark.sql("select max(count) from flight_data_2015").take(1)
        
      %spark.pyspark
      from pyspark.sql.functions import max
      flightData2015.select(max("count")).take(1)
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    
      # sql
      %spark.pyspark
      maxsql = spark.sql("""
      select dest_country_name,sum(count) as destination_total
      from flight_data_2015
      group by dest_country_name
      order by destination_total desc
      limit 5
      """)
        
      maxsql.show()
        
      # dataframe
      # 마지막 함수가 다 끝나고 나서야 Action 호출
      from pyspark.sql.functions import desc
      flightData2015.groupby("dest_country_name").sum("count").withColumnRenamed("sum(count)","destination_total").sort(desc("destination_total")).limit(5).show()
      flightData2015.groupby("dest_country_name").sum("count").withColumnRenamed("sum(count)","destination_total").sort(desc("destination_total")).limit(5).explain()
        
      >== Physical Plan ==
      TakeOrderedAndProject(limit=5, orderBy=[destination_total#230L DESC NULLS LAST], output=[dest_country_name#10,destination_total#230L])
      +- *(2) HashAggregate(keys=[dest_country_name#10], functions=[sum(cast(count#12 as bigint))])
         +- Exchange hashpartitioning(dest_country_name#10, 5)
            +- *(1) HashAggregate(keys=[dest_country_name#10], functions=[partial_sum(cast(count#12 as bigint))])
               +- *(1) FileScan csv [DEST_COUNTRY_NAME#10,count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/data.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>
    
참조

  • https://towardsdatascience.com/spark-jargon-for-babies-and-python-programmers-5ccba2c60f68
  • https://dev.to/rcmaples/explain-it-to-me-like-im-five-map-reduce–filter-edition-2mef/comments
  • https://medium.com/swlh/apache-spark-is-fun-eadcaf141c02