Spark Csv -> DataFrame
RDD에 이어 더 간편하게 사용가능한 Sparksession의 Dataframe 기능을 사용해보려고 합니다.
1. Read CSV
- sparksession에서 지원하여 주는 dataframe을 사용
- 데이터를 바라보고 타입을 유추
Schema Option
= 원하는 데이터 Schema가 있다면 세팅이 가능하다.-
각 타입별로 import가 필요하며, Schema 구성 시 Struct, StructField가 필수적이다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
from pyspark.sql import SparkSession spark = SparkSession.\ builder.\ appName("baro-mysql-test").\ master("spark://spark-master:7077").\ config("spark.jars", "jars/mysql-connector-java-8.0.27.jar").\ getOrCreate() file_dir = "datas/" file_name = "baro_dev_payment.csv" df = spark.read.csv(file_dir+file_name, header=True) df.show(5) df.printSchema()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
from pyspark.sql.types import StructType, StructField from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType schema = StructType([ StructField("id", IntegerType()),\ StructField("created_at", TimestampType()),\ StructField("updated_at", TimestampType()),\ StructField("deleted_at", TimestampType()),\ StructField("money_paid", IntegerType()),\ StructField("pay_method", StringType()),\ StructField("result", StringType()),\ StructField("card_id", StringType()),\ StructField("usage_id", StringType()) ]) file_dir = "datas/" file_name = "payment-test.csv" df = spark.read.csv(file_dir+file_name, header=True, schema=schema) df.show(5) df.printSchema()
2. mapping
- Dataframe은 별도의 Map 기능이 존재하지 않는다.
- 대신, 기존 데이터를 변경가능한
withColumn
과col
을 통해 데이터를 mapping 가능하다. - 특정 칼럼만 추출하고 싶은 경우
select
를 사용한다. -
아래는 KTC로 변경, 결제금액을 map한 코드이다.
1 2 3 4 5 6 7 8 9 10
# 결제 금액+1 / UTC->KTC from pyspark.sql.functions import col, from_utc_timestamp, date_format # dataframe은 map 함수 X map_df = df.withColumn("change_money_paid", col("money_paid") + 1)\ .withColumn("change_ktc_created_at", from_utc_timestamp(col("created_at"),"Asia/Seoul"))\ .withColumn("change_yyyymm_created_at", date_format(col("created_at"),"yyyy-MM"))\ .select("id", "created_at","money_paid", "change_money_paid", "change_ktc_created_at", "change_yyyymm_created_at") map_df.show(5)
3. Reduce
- 날짜 별 결제 데이터와 금액이 있다고 하였을 때 같은 년,월 데이터인 경우를 뽑아보려고 합니다.
date_format
을 활용하여 년,월 데이터로 변경group by
,sort
함수를 활용하여 집계,정렬
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType
from pyspark.sql.functions import date_format, col
df = spark.read.csv(file_dir+file_name, header=True, schema=schema)
# change column type
df = df.withColumn("change_created_at", date_format(col("created_at"), "yyyy-MM"))
# select specific dataframe
df = df.select(["change_created_at", "money_paid"])
df.show(5)
# reduce
df.groupBy("change_created_at").sum("money_paid").show(5)
# sort asc
df.sort("change_created_at",col("money_paid").asc()).show(5)
# sort desc
df.sort("change_created_at",col("money_paid").desc()).show(5)