Kimuksung
Kimuksung 안녕하세요. 분산처리에 관심이 많은 생각하는 주니어 Data Enginner입니다.

Spark - Hadoop 연결하기

Spark - Hadoop 연결하기

Pyspark로 연결 시도 시 datanode에 값이 없다고 한다.

단계 별로 문제가 무엇인지 어떻게 해결할지를 구성해보았습니다.

결론 : ec2 인스턴스의 vpc 설정 값을 NameNode는 Private으로 DataNode는 Public으로 hosts를 구성하여 해결하였다.

2023-10-10, 2023-10-11 추가 업데이트


1. 파일 생성 권한 사라지는 문제

  • hadoop cluster 재시작 후 파일 생성 시 권한이 없다.
  • hdfs 명령어로 모든 유저가 변경 가능하도록 권한 부여(특정 파일)

Untitled-43

1
$ hdfs dfs -chmod 777 /
2. Spark에서 Hadoop으로 연결

  • a) datanode 연결 설정 추가
1
2
3
4
5
6
7
$ sudo vi $HADOOP_HOME/etc/hadoop/core-site.xml

# 추가
<property>
    <name>dfs.client.use.datanode.hostname</name>
    <value>true</value>
</property>
  • b) Connection TimeOut
  • 일반적으로 대부분의 사람이 같은 Node에서 구성하였는지 연결 시 hdfs://localhost:port으로 시도하였으나, 현재 상황에서는 알맞는 답이 아니다.
  • 하지만, 나의 상황에서는 별도의 Node에서 구성
  • url : hdfs://IP:Port
  • path : /test/txt_dir/

Untitled-44

1
2
3
4
5
6
7
8
9
10
11
12
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("ReadHDFS")
sc = SparkContext(conf=conf)

hdfs_server = "hdfs://3.34.179.116:9000"
hdfs_file_path = "/test/123/"  # 사용자 이름을 본인의 HDFS 사용자 이름으로 대체하세요.

data = sc.textFile(hdfs_server+hdfs_file_path+"sample.txt")
first_lines = data.take(5)
for line in first_lines:
    print(line)
3. 연결이 된 이후에 데이터를 가져올 때 Block 정보를 아는 DataNode가 없다고 한다.

  • Connection Timeout
  • 데이터 요청 시 Spark → NameNode → DataNode → 직접 연결할 것으로 예상
  • 하지만, Hadoop Cluster 구성 시 private-ip로 서로를 바라본다. (동일 VPC에 존재하는 EC2 인스턴스이기 때문에)
  • 그렇기에 NameNode에서 DataNode 값을 전달해줄 때, private-ip를 전달해주어 연결이 불가하여 connection 문제가 발생하는거 같은데.. 이 부분에 대해서는 조금 더 찾아봐야 겠다.
  • DataNode를 Public으로 구성하는 경우 인스턴스 시작 시 binding 에러로 구성이 불가하다.

2023-10-10-11-00-49

1
$ nc -v ip port
3-1) ssh 연결 문제

  • Hadoop start-all.sh 구성 시 각 노드에 제대로 연결하지 못하는 이슈 발생
  • ssh keygen 다시 설정하여 준다.

2023-10-11-11-53-36

1
$ ssh-keygen -R worker01
3-2) Node 별 Public&Private 구성 문제

  • http://Masterip:9870 Master Node의 Hadoop UI를 들어가보면, Worker Node들이 Private IP로 구성된 것을 볼 수 있다. 이 부분이 문제인 것일까?
  • 찾아보니 NameNode, Secondary NameNode의 노드에서는 자신을 가리키는 /etc/hosts에서는 private-ip로 설정해야 한다. ( 그렇지 않으면 NameNode가 실행되지 않는다. )
  • 이외의 DataNode에서는 무조건 Public으로 해야 외부에서 접근이 가능하며, 그렇지 않은 경우에는 접속이 불가능하다.
  • 그렇다 이 방법이 해결책이였다. 결국, EC2 Instance의 Public Private을 각 노드 별 설정환경에 맞추어 값을 넣어주고 사용하면 된다.

2023-10-11-11-03-22


4) Hadoop Read&Write 처리하기

  • Pyspark로 Hadoop의 있는 데이터를 Read하거나 Write하는 방법을 임시 Class로 구성하여 보았다.
  • ConnectHadoop 클래스 안에 read_file과 write_file로 구성하였다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from pyspark.sql import SparkSession
from datetime import datetime

class ConnectHadoop:
    def __init__(self, ip="masterip", port="9000", file_path="", file_name="", user_name="kim"):
        self.hdfs_ip = ip
        self.hdfs_port = port
        self.hdfs_server = f"hdfs://{self.hdfs_ip}:{self.hdfs_port}"
        self.hdfs_file_path = file_path
        self.hdfs_file_name = file_name
        self.hdfs_user_name = user_name
        self.spark = SparkSession.builder \
            .appName("WriteHDFS") \
            .config("spark.hadoop.fs.defaultFS", self.hdfs_server) \
            .config("spark.hadoop.yarn.resourcemanager.hostname", self.hdfs_ip) \
            .config("spark.hadoop.user.name", self.hdfs_user_name) \
            .getOrCreate()

    def __del__(self):
        print('end spark')

    def read_file(self, read_type):
        if read_type == "txt":
            return self.spark.read.text(self.hdfs_file_path + self.hdfs_file_name)
        elif read_type == "parquet":
            return self.spark.read.parquet(self.hdfs_file_path)

    def write_file(self, df, write_type):
        if write_type == "parquet":
            df.write.format("parquet").mode("overwrite").save(self.hdfs_file_path)

if __name__ == "__main__":
    current_date = datetime.now().strftime("%Y-%m-%d")

    # read text file
    hadoop = ConnectHadoop(file_path="/test/", file_name="hadooptest.txt")
    data = hadoop.read_file(read_type="txt")
    print(data.take(5))

    # write parquet
    hadoop = ConnectHadoop(file_path=f"/test/log_dir/{current_date}")
    raw_data = [("Alice", 34), ("Bob", 45), ("Charlie", 29)]
    columns = ["Name", "Age"]
    df = hadoop.spark.createDataFrame(raw_data, columns)
    hadoop.write_file(df, write_type="parquet")

    # read parquet
    data = hadoop.read_file(read_type="parquet")
    print(data.take(5))

2023-10-11-11-08-54 2023-10-11-11-09-44


참고

  • https://engineering.linecorp.com/ko/blog/data-engineering-software-troubleshooting