Kimuksung
Kimuksung 안녕하세요. 분산처리에 관심이 많은 생각하는 주니어 Data Enginner입니다.

Spark Csv -> DataFrame

Spark Csv -> DataFrame

RDD에 이어 더 간편하게 사용가능한 Sparksession의 Dataframe 기능을 사용해보려고 합니다.

1. Read CSV

  • sparksession에서 지원하여 주는 dataframe을 사용
  • 데이터를 바라보고 타입을 유추
  • Schema Option = 원하는 데이터 Schema가 있다면 세팅이 가능하다.
  • 각 타입별로 import가 필요하며, Schema 구성 시 Struct, StructField가 필수적이다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    
      from pyspark.sql import SparkSession
        
      spark = SparkSession.\
              builder.\
              appName("baro-mysql-test").\
              master("spark://spark-master:7077").\
              config("spark.jars", "jars/mysql-connector-java-8.0.27.jar").\
              getOrCreate()
        
      file_dir = "datas/"
      file_name = "baro_dev_payment.csv"
        
      df = spark.read.csv(file_dir+file_name, header=True)
      df.show(5)
      df.printSchema()
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    
      from pyspark.sql.types import StructType, StructField
      from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType
        
      schema = StructType([
          StructField("id", IntegerType()),\
          StructField("created_at", TimestampType()),\
          StructField("updated_at", TimestampType()),\
          StructField("deleted_at", TimestampType()),\
          StructField("money_paid", IntegerType()),\
          StructField("pay_method", StringType()),\
          StructField("result", StringType()),\
          StructField("card_id", StringType()),\
          StructField("usage_id", StringType())
      ])
        
      file_dir = "datas/"
      file_name = "payment-test.csv"
        
      df = spark.read.csv(file_dir+file_name, header=True, schema=schema)
      df.show(5)
      df.printSchema()
    
2. mapping

  • Dataframe은 별도의 Map 기능이 존재하지 않는다.
  • 대신, 기존 데이터를 변경가능한 withColumncol을 통해 데이터를 mapping 가능하다.
  • 특정 칼럼만 추출하고 싶은 경우 select를 사용한다.
  • 아래는 KTC로 변경, 결제금액을 map한 코드이다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    
      # 결제 금액+1 / UTC->KTC
      from pyspark.sql.functions import col, from_utc_timestamp, date_format
        
      # dataframe은 map 함수 X
      map_df = df.withColumn("change_money_paid", col("money_paid") + 1)\
          .withColumn("change_ktc_created_at", from_utc_timestamp(col("created_at"),"Asia/Seoul"))\
          .withColumn("change_yyyymm_created_at", date_format(col("created_at"),"yyyy-MM"))\
          .select("id", "created_at","money_paid", "change_money_paid", "change_ktc_created_at", "change_yyyymm_created_at")
        
      map_df.show(5)
    
3. Reduce

  • 날짜 별 결제 데이터와 금액이 있다고 하였을 때 같은 년,월 데이터인 경우를 뽑아보려고 합니다.
  • date_format을 활용하여 년,월 데이터로 변경
  • group by, sort 함수를 활용하여 집계,정렬
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType
from pyspark.sql.functions import date_format, col

df = spark.read.csv(file_dir+file_name, header=True, schema=schema)
# change column type
df = df.withColumn("change_created_at",  date_format(col("created_at"), "yyyy-MM"))
# select specific dataframe
df = df.select(["change_created_at", "money_paid"])
df.show(5)

# reduce
df.groupBy("change_created_at").sum("money_paid").show(5)
# sort asc
df.sort("change_created_at",col("money_paid").asc()).show(5)
# sort desc
df.sort("change_created_at",col("money_paid").desc()).show(5)

result