Kimuksung
Kimuksung 안녕하세요. 분산처리에 관심이 많은 생각하는 주니어 Data Enginner입니다.

Airflow를 활용하여 S3 Parquet 데이터 Redshift 적재

Airflow를 활용하여 S3 Parquet 데이터 Redshift 적재

안녕하세요
오늘은 S3 Parquet 데이터를 Redshift에 적재하는 방법에 대해 공유드리려고 합니다.


S3 to Redshift

  • MWAA IP는 고정 ( 2 Public Subnet )
  • Public Subnet이 2개임으로 Redshift Inbound에 Ip 2개다 전부 추가해주어야 한다.
  • S3에는 Value값만 들어가 있다.
  • Parquet type으로 데이터 전달.
Step1) create table redshift
  • Table 구성이 먼저 되어있어야 값이 추가 가능합니다.
Step2) S3 → Redshift
  • Parquet File 업로드
  • Copy 명령어 지원 ( 참고 )
1
2
3
4
5
6
7
# Result Copy Command
# 문자로 된 파일을 ,로 구분하여 데이터 입력
copy table 
from 's3://{bucketname}/{key}'
credentials 'aws_access_key_id=id;aws_secret_access_key=key'
copy option
; 
  • redshift_host
  • aws_access_key,id
  • s3_bucket(required) = Bucket 이름
  • s3_key(required) = File Directory
  • schema (required)
  • collection(required)
  • copy_options
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def copy_redshift(
    aws_access_key_id,
    aws_secret_access_key,
    schema,
    collection,
    file_name,
    s3_prefix,
		redshift_host,
		**s3_bucket**
):
    import redshift_connector
    aws_info = {
            "region_name": "ap-northeast-2",
            "aws_access_key_id": aws_access_key_id,
            "aws_secret_access_key": aws_secret_access_key
        }
    bucket_name=**s3_bucket**
    s3_prefix=s3_prefix
    copy_sql = f"""
	    COPY {schema}.{collection}
	        FROM 's3://{bucket_name}/{s3_prefix}{file_name}'
	        credentials
	        'aws_access_key_id={aws_info['aws_access_key_id']};aws_secret_access_key={aws_info['aws_secret_access_key']}'
	        format as parquet;
    """.format()

    conn = redshift_connector.connect(
            host= redshift_host,
            database='',
            user= '',
            password= '!',
            auto_create= True
        )
    cursor: redshift_connector.Cursor = conn.cursor()

    print(copy_sql)
    cursor.execute( copy_sql )
    conn.commit()
참고 자료