Airflow를 활용하여 S3 Parquet 데이터 Redshift 적재
안녕하세요
오늘은 S3 Parquet 데이터를 Redshift에 적재하는 방법에 대해 공유드리려고 합니다.
S3 to Redshift
MWAA IP는 고정
( 2 Public Subnet )- Public Subnet이 2개임으로 Redshift Inbound에 Ip 2개다 전부 추가해주어야 한다.
- S3에는 Value값만 들어가 있다.
- Parquet type으로 데이터 전달.
Step1) create table redshift
- Table 구성이 먼저 되어있어야 값이 추가 가능합니다.
Step2) S3 → Redshift
- Parquet File 업로드
Copy
명령어 지원 ( 참고 )
1
2
3
4
5
6
7
# Result Copy Command
# 문자로 된 파일을 ,로 구분하여 데이터 입력
copy table
from 's3://{bucketname}/{key}'
credentials 'aws_access_key_id=id;aws_secret_access_key=key'
copy option
;
- redshift_host
- aws_access_key,id
- s3_bucket(required) = Bucket 이름
- s3_key(required) = File Directory
- schema (required)
- collection(required)
- copy_options
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def copy_redshift(
aws_access_key_id,
aws_secret_access_key,
schema,
collection,
file_name,
s3_prefix,
redshift_host,
**s3_bucket**
):
import redshift_connector
aws_info = {
"region_name": "ap-northeast-2",
"aws_access_key_id": aws_access_key_id,
"aws_secret_access_key": aws_secret_access_key
}
bucket_name=**s3_bucket**
s3_prefix=s3_prefix
copy_sql = f"""
COPY {schema}.{collection}
FROM 's3://{bucket_name}/{s3_prefix}{file_name}'
credentials
'aws_access_key_id={aws_info['aws_access_key_id']};aws_secret_access_key={aws_info['aws_secret_access_key']}'
format as parquet;
""".format()
conn = redshift_connector.connect(
host= redshift_host,
database='',
user= '',
password= '!',
auto_create= True
)
cursor: redshift_connector.Cursor = conn.cursor()
print(copy_sql)
cursor.execute( copy_sql )
conn.commit()
참고 자료