Kimuksung
Kimuksung 안녕하세요. 분산처리에 관심이 많은 생각하는 주니어 Data Enginner입니다.

가상 로그 데이터 실시간 처리 구성하기 With. Kafka, Spark, Hadoop

가상 로그 데이터 실시간 처리 구성하기 With. Kafka, Spark, Hadoop


가상 로그 데이터를 Streaming 처리해보기라는 목표를 가지고 지금까지 어떻게 구성했는지를 소개드리려고 합니다.


목표

가상의 로그 데이터로 HDFS에 스트리밍 처리하여 로그 데이터를 읽을 수 있도록 구성한다.

  • 가상의 Log 데이터 생성
  • Log 데이터를 Kafka에 Produce
  • Kafka에 전송된 Messages들을 실시간으로 처리하여 HDFS에 Parquet로 저장
  • Pyspark로 Parquet에 저장된 로그 파일을 읽고 분석할 수 있도록 구성


단계 별 진행 계획 및 구성 과정 링크


Python 로그 데이터 생성 및 Produce

  • 로그 데이터 생성 - log_generator.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    
      # Log Generator
      # python log_generator.py -n 10 -o LOG
      import time
      import datetime
      import numpy
      import random
      import argparse
      import uuid
      from faker import Faker
      from tzlocal import get_localzone
        
      parser = argparse.ArgumentParser(__file__, description="Fake Apache Log Generator")
      parser.add_argument("--output", "-o", dest='output_type', help="Write to a Log file, a gzip file or to STDOUT",
                          choices=['LOG', 'GZ', 'CONSOLE'])
      parser.add_argument("--num", "-n", dest='num_lines', help="Number of lines to generate (0 for infinite)", type=int,
                          default=1)
      parser.add_argument("--prefix", "-p", dest='file_prefix', help="Prefix the output file name", type=str)
      parser.add_argument("--sleep", "-s", help="Sleep this long between lines (in seconds)", default=0.0, type=float)
        
      args = parser.parse_args()
        
      # file_info
      log_lines = args.num_lines
      file_prefix = args.file_prefix
      output_type = args.output_type
        
      faker = Faker()
        
      # time
      local = get_localzone()
      timestr = time.strftime("%Y%m%d")
      otime = datetime.datetime.now()
        
      outFileName = 'access_log_' + timestr + '.log'
        
      f = open(outFileName, 'a')
        
      # log infos
      response = ["200", "404", "500", "301"]
      http_methods = ["GET", "POST", "DELETE", "PUT"]
      useragentlist = [faker.firefox, faker.chrome, faker.safari, faker.internet_explorer, faker.opera]
        
      while log_lines:
          if args.sleep:
              increment = datetime.timedelta(seconds=args.sleep)
          else:
              increment = datetime.timedelta(seconds=random.randint(1, 100))
          otime += increment
        
          userid = uuid.uuid4()
          ip = faker.ipv4()
          dt = otime.strftime('%Y-%m-%d %H:%M:%S')
          timezone = datetime.datetime.now(local).strftime('%z')
          http_method = numpy.random.choice(http_methods, p=[0.6, 0.1, 0.1, 0.2])
        
          resp = numpy.random.choice(response, p=[0.9, 0.04, 0.02, 0.04])
          uri_data = faker.uri()
          useragent = numpy.random.choice(useragentlist, p=[0.5, 0.3, 0.1, 0.05, 0.05])()
          logs = [userid, ip, dt+timezone, http_method, resp, uri_data, useragent]
          logs = list(map(str, logs))
          f.write("\t".join(logs)+"\n")
        
          log_lines = log_lines - 1
          if args.sleep:
              time.sleep(args.sleep)
    
  • Kafka Produce&Consume - kafka_module.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    
      # kafka_module.py
      # Kafka Produce, Consume
      from kafka import KafkaProducer
      from kafka import KafkaConsumer
      from json import dumps
      from datetime import datetime
      import time
        
      class KafkaInfos:
          def __init__(self, topic: str, nodes: list):
              self.topic = topic
              self.bootstrap_servers = nodes
        
          def set_produce(self, values):
              try:
                  producer = KafkaProducer(
                      bootstrap_servers=self.bootstrap_servers,
                      api_version=(2, 5, 0),
                      acks=0,
                      compression_type='gzip',
                      value_serializer=lambda x: dumps(x).encode('utf-8')
                  )
        
                  start = time.time()
                  print('[Produce] - 메시지 전송을 시작합니다.')
        
                  for i, val in enumerate(values, 0):
                      print(f'{i}번 전송중 {val}')
                      producer.send(self.topic, value=val)
        
                  producer.flush()
                  producer.close()
        
                  print(f'[Produce] 걸린시간: {time.time() - start}')
                  return {"status": 200}
        
              except Exception as exc:
                  raise exc
        
          def get_consume(self, option="earliest"):
              try:
                  consumer = KafkaConsumer(
                      self.topic,
                      bootstrap_servers=self.bootstrap_servers,
                      value_deserializer=lambda x: x.decode(
                          "utf-8"
                      ),
                      auto_offset_reset=option #default = latest / earliest = 과거 데이터도 consume
                  )
                  print(f'[Consume] - 시작')
        
                  for message in consumer:
                      print(f'Partition : {message.partition}, Offset : {message.offset}, Value : {message.value}')
        
              except Exception as exc:
                  raise exc
        
              finally:
                  consumer.close()
    
  • 로그 데이터 전처리&Produce - log_data.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    
      # log_data.py
      # log 전처리 및 Produce
      from kafka_module import KafkaInfos
        
      class LogData(KafkaInfos):
          def __init__(self, logfile_dir, logfile_name):
              self.logfile = logfile_dir+logfile_name
              self.__bootstrap_servers = ['master:9092', 'slave:9092', ..]
              self.__topic = "log"
              super().__init__(self.__topic, self.__bootstrap_servers)
        
          def parsing_read(self):
              with open(self.logfile) as f:
                  f = f.readlines()
        
                  for line in f:
                      userid, ip, log_time, method, status, url, agent, *rest = line.split('\t')
                      json_log = {
                          'userid': userid,
                          'ip': ip,
                          'log_time': log_time,
                          'method': method,
                          'status': status,
                          'url': url,
                          'agent': agent,
                      }
                      yield json_log
        
          def produce(self):
              self.set_produce(self.parsing_read())
        
      if __name__ == "__main__":
          logfile_dir = "/Users/wclub/kafka"
          logfile_name = "/access_log_20231013.log"
        
          logobject = LogData(logfile_dir, logfile_name)
        
          # generator
          print(logobject.parsing_read())
          # log file read
          [print(i) for i in logobject.parsing_read()]
        
          # kafka produce
          logobject.produce()
    
PySpark로 Kafka, Hadoop 연결 및 처리 가능한 환경 구성
streaming_test

- SparkSession으로 Kafka 연결하기

2023-10-14-3-58-57

- HDFS 로그 데이터 Dataframe으로 처리하여 읽기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType

class ConnectHadoop:
    def __init__(self, ip="ip", port="port", file_path="", file_name="", user_name="kim"):
        self.hdfs_ip = ip
        self.hdfs_port = port
        self.hdfs_server = f"hdfs://{self.hdfs_ip}:{self.hdfs_port}"
        self.hdfs_file_path = file_path
        self.hdfs_file_name = file_name
        self.hdfs_user_name = user_name
        self.spark = SparkSession.builder \
            .appName("WriteHDFS") \
            .config("spark.hadoop.fs.defaultFS", self.hdfs_server) \
            .config("spark.hadoop.yarn.resourcemanager.hostname", self.hdfs_ip) \
            .config("spark.hadoop.user.name", self.hdfs_user_name) \
            .getOrCreate()

    def __del__(self):
        print('end spark')

    def read_file(self, read_type):
        if read_type == "txt":
            return self.spark.read.text(self.hdfs_file_path + self.hdfs_file_name)
        elif read_type == "parquet":
            return self.spark.read.parquet(self.hdfs_file_path)
        elif read_type == "csv":
            return self.spark.read.csv(self.hdfs_file_path)

    def write_file(self, df, write_type):
        try:
            if write_type == "parquet":
                df.write.format("parquet").mode("overwrite").save(self.hdfs_file_path)
            elif write_type == "csv":
                df.coalesce(2).write.mode('overwrite').option('header', 'true').csv(self.hdfs_file_path)
            elif write_type == "json":
                df.write.format("json").mode("overwrite").save(hadoop.hdfs_file_path)
        except Exception as e:
            print(f'Exception 발생 : {e} , 시간 : {datetime.now()}')

if __name__ == "__main__":
    current_date = datetime.now().strftime("%Y-%m-%d")

    json_schema = StructType(
                    [StructField("userid", StringType(), True),
                     StructField("ip", StringType(), True),
                     StructField("log_time", StringType(), True),
                     StructField("method", StringType(), True),
                     StructField("status", StringType(), True),
                     StructField("url", StringType(), True),
                     StructField("agent", StringType(), True)
                 ])

    hadoop = ConnectHadoop(file_path=f"/test/log_dir/{current_date}")
    df = hadoop.read_file(read_type="parquet")

    parsed_list = ["parsed_value.userid", "parsed_value.ip", "parsed_value.log_time", "parsed_value.method",
                   "parsed_value.status", "parsed_value.url", "parsed_value.agent"]
    parsed_df = df.select(from_json(col("value"), json_schema).alias("parsed_value"))
    final_df = parsed_df.select(*parsed_list)

    final_df.show()
    # 전체 출력
    # final_df.show(final_df.count(), truncate=False)

Untitled-50