Kimuksung
Kimuksung 안녕하세요. 분산처리에 관심이 많은 생각하는 주니어 Data Enginner입니다.

Spark로 Private AWS RDS 연동

Spark로 Private AWS RDS 연동

Python의 도움을 받아 RDS를 연결하여 Spark를 활용한 전처리를 시도 중.
Node -> EC2 -> AWS RDS Mysql에 있는 데이터를 가져오는 것이 목표.
ssh를 통한 연결까지는 성공하였으나, 이후에 jdbc로 부르는 부분이 먹히지 않는다.
몇일 째 시도중이지만 개선되지 않아 지금까지 햇던 것들을 정리하려고 한다.

1) 직접적인 다이렉트 연결 -> private subnet이 아닌 public subnet에 있다면 이상 없이 연결이 된다.

2) SSH shell 활용한 접근 방식

  • EC2에서 Mysql에 접근할 때 아래와 같이 접근한다.
    1
    
    ssh -i {pem 경로} -N -L 13306:rds-instance:rds-port ubuntu@instance
    
  • 터널링에서 13306 -> 3306 으로 포트포워딩된 상태에서 로컬에서 mysql로 접속
    1
    
    mysql -h 127.0.0.1 -P 13306 -u userid -p
    
  • 하지만 Code Level에서 연결하는 것과는 다른 이야기 위와 같이 하면 command shell에서는 동작한다.

3) SSHTunneling

  • Python을 활용하여 터널링을 열어 EC2에 접근하는 방식
  • 임의로 특정 포트를 열어 연결
  • N옵션을 주어 Local에서도 실행되게끔
  • 아래 코드는 SSHTunneling을 활용하여 RDS에서 Command를 날린 것과 같다.
  • RDS 자체에서 동작하도록 한 것이다.

다만 해주고 싶은 부분은 pyspark를 통해 바로 건드리고 싶은것이다..

SSHTunnelForwarder과 jdbc 연결을 통해 접근하여 보았으나, ec2를 거치지 않고 운영되는 rds만 접속이 될 뿐 ec2를 거치는 rds는 아직 부족하다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from sshtunnel import SSHTunnelForwarder
from pyspark.sql import SparkSession
import os

pem_file_dir = os.path.dirname('pem/')
    
# SSH 설정
pem_file_path = ""  # pem 파일 경로
ssh_username = ""  # SSH 사용자명
ssh_hostname = ""  # SSH 서버 주소


# RDS 설정
rds_hostname = ""  # RDS의 호스트명
rds_username = ""  # RDS 사용자명
rds_password = ""  # RDS 패스워드
rds_database = ""  # RDS 데이터베이스명
rds_table = ""

try:
    with SSHTunnelForwarder(
        (ssh_hostname, 22),
        ssh_username=ssh_username,
        ssh_pkey=pem_file_path,
        remote_bind_address=(rds_hostname, 3306)
    ) as tunnel:
        print('--start ssh--')
        tunnel.start()
        local_binding_port = tunnel.local_bind_port
        print('bind port : ' , local_binding_port)
        db = pymysql.connect( host='127.0.0.1', user=rds_username, password=rds_password, port=local_binding_port, database= rds_database)
        try:
            with db.cursor() as cur:
                cur.execute(f'select * from {rds_table}')
                df = cur.fetchall()
                print(df)
                #for r in cur:
                    #print(r)
        finally:
            db.close()
except BaseException as e:
    print('Problem is --> ', e)
finally:
    if tunnel:
        tunnel.close()

df_spark = spark.createDataFrame(df)
df_spark.show()

4) Pyspark read jdbc

  • 목표점으로 둔 것이 이 부분인데.. connection refuse 에러부터 로그를 보며 찾아보려고 하였으나, 아직 까지 해결 방안을 찾지 못했다ㅠㅠ
  • 아래와 같이 구현하면 될 거 같은데 완성되진 않았으니 참고용
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
try:
    with SSHTunnelForwarder(
        (ssh_hostname, 22),
        ssh_username=ssh_username,
        ssh_pkey=pem_file_path,
        remote_bind_address=(rds_hostname, 3306)
    ) as tunnel:
        print('--start ssh--')
        tunnel.start()
        print('bind port : ' , tunnel.local_bind_port)
        bing_port = tunnel.local_bind_port

        # MySQL에 연결
        jdbc_url = f"jdbc:mysql://localhost:3306/{rds_database}?enabledTLSProtocols=TLSv1.2"
        print(jdbc_url)
        df = spark.read \
            .format("jdbc") \
            .option("url", jdbc_url) \
            .option("driver", "com.mysql.cj.jdbc.Driver") \
            .option("dbtable", rds_table) \
            .option("user", rds_username) \
            .option("password", rds_password) \
            .load()

        df.show(5)
        df.printSchema()
except BaseException as e:
    print('Problem is --> ', e)
finally:
    if tunnel:
        tunnel.close()