가상 로그 데이터 실시간 처리 구성하기 With. Kafka, Spark, Hadoop
가상 로그 데이터를 Streaming 처리해보기라는 목표를 가지고 지금까지 어떻게 구성했는지를 소개드리려고 합니다.
목표
가상의 로그 데이터로 HDFS에 스트리밍 처리하여 로그 데이터를 읽을 수 있도록 구성한다.
- 가상의 Log 데이터 생성
- Log 데이터를 Kafka에 Produce
- Kafka에 전송된 Messages들을 실시간으로 처리하여 HDFS에 Parquet로 저장
- Pyspark로 Parquet에 저장된 로그 파일을 읽고 분석할 수 있도록 구성
단계 별 진행 계획 및 구성 과정 링크
- Ec2 Instance Kafka ,Hadoop Cluster 구성 - Kafka CLuster 구성 링크 , Hadoop CLuster 구성 링크
- Spark와 Jupyter는 Docker에 구성 - Spark 구성 링크
- Spark로 Kafka, Hadoop 연결 및 처리 가능한 환경 구성 - Kafka 연결하기 , Hadoop 연결하기, stream 연결하기
- Python으로 가상의 로그 데이터 생성 파일 구성 - 로그 데이터 생성 링크
- 로그 데이터 전처리 및 Produce 처리 - 전처리 및 Produce 링크
Python 로그 데이터 생성 및 Produce
-
로그 데이터 생성 -
log_generator.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
# Log Generator # python log_generator.py -n 10 -o LOG import time import datetime import numpy import random import argparse import uuid from faker import Faker from tzlocal import get_localzone parser = argparse.ArgumentParser(__file__, description="Fake Apache Log Generator") parser.add_argument("--output", "-o", dest='output_type', help="Write to a Log file, a gzip file or to STDOUT", choices=['LOG', 'GZ', 'CONSOLE']) parser.add_argument("--num", "-n", dest='num_lines', help="Number of lines to generate (0 for infinite)", type=int, default=1) parser.add_argument("--prefix", "-p", dest='file_prefix', help="Prefix the output file name", type=str) parser.add_argument("--sleep", "-s", help="Sleep this long between lines (in seconds)", default=0.0, type=float) args = parser.parse_args() # file_info log_lines = args.num_lines file_prefix = args.file_prefix output_type = args.output_type faker = Faker() # time local = get_localzone() timestr = time.strftime("%Y%m%d") otime = datetime.datetime.now() outFileName = 'access_log_' + timestr + '.log' f = open(outFileName, 'a') # log infos response = ["200", "404", "500", "301"] http_methods = ["GET", "POST", "DELETE", "PUT"] useragentlist = [faker.firefox, faker.chrome, faker.safari, faker.internet_explorer, faker.opera] while log_lines: if args.sleep: increment = datetime.timedelta(seconds=args.sleep) else: increment = datetime.timedelta(seconds=random.randint(1, 100)) otime += increment userid = uuid.uuid4() ip = faker.ipv4() dt = otime.strftime('%Y-%m-%d %H:%M:%S') timezone = datetime.datetime.now(local).strftime('%z') http_method = numpy.random.choice(http_methods, p=[0.6, 0.1, 0.1, 0.2]) resp = numpy.random.choice(response, p=[0.9, 0.04, 0.02, 0.04]) uri_data = faker.uri() useragent = numpy.random.choice(useragentlist, p=[0.5, 0.3, 0.1, 0.05, 0.05])() logs = [userid, ip, dt+timezone, http_method, resp, uri_data, useragent] logs = list(map(str, logs)) f.write("\t".join(logs)+"\n") log_lines = log_lines - 1 if args.sleep: time.sleep(args.sleep)
-
Kafka Produce&Consume -
kafka_module.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
# kafka_module.py # Kafka Produce, Consume from kafka import KafkaProducer from kafka import KafkaConsumer from json import dumps from datetime import datetime import time class KafkaInfos: def __init__(self, topic: str, nodes: list): self.topic = topic self.bootstrap_servers = nodes def set_produce(self, values): try: producer = KafkaProducer( bootstrap_servers=self.bootstrap_servers, api_version=(2, 5, 0), acks=0, compression_type='gzip', value_serializer=lambda x: dumps(x).encode('utf-8') ) start = time.time() print('[Produce] - 메시지 전송을 시작합니다.') for i, val in enumerate(values, 0): print(f'{i}번 전송중 {val}') producer.send(self.topic, value=val) producer.flush() producer.close() print(f'[Produce] 걸린시간: {time.time() - start}') return {"status": 200} except Exception as exc: raise exc def get_consume(self, option="earliest"): try: consumer = KafkaConsumer( self.topic, bootstrap_servers=self.bootstrap_servers, value_deserializer=lambda x: x.decode( "utf-8" ), auto_offset_reset=option #default = latest / earliest = 과거 데이터도 consume ) print(f'[Consume] - 시작') for message in consumer: print(f'Partition : {message.partition}, Offset : {message.offset}, Value : {message.value}') except Exception as exc: raise exc finally: consumer.close()
-
로그 데이터 전처리&Produce -
log_data.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
# log_data.py # log 전처리 및 Produce from kafka_module import KafkaInfos class LogData(KafkaInfos): def __init__(self, logfile_dir, logfile_name): self.logfile = logfile_dir+logfile_name self.__bootstrap_servers = ['master:9092', 'slave:9092', ..] self.__topic = "log" super().__init__(self.__topic, self.__bootstrap_servers) def parsing_read(self): with open(self.logfile) as f: f = f.readlines() for line in f: userid, ip, log_time, method, status, url, agent, *rest = line.split('\t') json_log = { 'userid': userid, 'ip': ip, 'log_time': log_time, 'method': method, 'status': status, 'url': url, 'agent': agent, } yield json_log def produce(self): self.set_produce(self.parsing_read()) if __name__ == "__main__": logfile_dir = "/Users/wclub/kafka" logfile_name = "/access_log_20231013.log" logobject = LogData(logfile_dir, logfile_name) # generator print(logobject.parsing_read()) # log file read [print(i) for i in logobject.parsing_read()] # kafka produce logobject.produce()
PySpark로 Kafka, Hadoop 연결 및 처리 가능한 환경 구성
- SparkSession으로 Kafka 연결하기
In [1]:
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import col, concat, lit
current_date = datetime.now().strftime("%Y-%m-%d")
scala_version = '2.12'
spark_version = '3.1.1'
packages = [
f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
'org.apache.kafka:kafka-clients:3.2.0'
]
spark = SparkSession.builder\
.master("local")\
.appName("kafka-example")\
.config("spark.jars.packages", ",".join(packages))\
.getOrCreate()
spark
:: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache The jars for the packages stored in: /root/.ivy2/jars org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency org.apache.kafka#kafka-clients added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-69972b37-dec6-4a2b-90c0-d1849abeaf02;1.0 confs: [default] found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.1 in central found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.1 in central found org.spark-project.spark#unused;1.0.0 in central found org.apache.commons#commons-pool2;2.6.2 in central found org.apache.kafka#kafka-clients;3.2.0 in central found com.github.luben#zstd-jni;1.5.2-1 in central found org.lz4#lz4-java;1.8.0 in central found org.xerial.snappy#snappy-java;1.1.8.4 in central found org.slf4j#slf4j-api;1.7.36 in central :: resolution report :: resolve 189ms :: artifacts dl 5ms :: modules in use: com.github.luben#zstd-jni;1.5.2-1 from central in [default] org.apache.commons#commons-pool2;2.6.2 from central in [default] org.apache.kafka#kafka-clients;3.2.0 from central in [default] org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.1 from central in [default] org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.1 from central in [default] org.lz4#lz4-java;1.8.0 from central in [default] org.slf4j#slf4j-api;1.7.36 from central in [default] org.spark-project.spark#unused;1.0.0 from central in [default] org.xerial.snappy#snappy-java;1.1.8.4 from central in [default] :: evicted modules: org.apache.kafka#kafka-clients;2.6.0 by [org.apache.kafka#kafka-clients;3.2.0] in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 10 | 0 | 0 | 1 || 9 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-69972b37-dec6-4a2b-90c0-d1849abeaf02 confs: [default] 0 artifacts copied, 9 already retrieved (0kB/5ms) 23/10/14 05:06:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/10/14 05:06:39 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Out[1]:
SparkSession - in-memory
- Kafka 데이터 stream Consume 하기
In [2]:
# Kafka Consume Stream
kafkaDf = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers", "master:9092, worker:9092")\
.option("subscribe", 'log')\
.option("startingOffsets", "earliest")\
.load()
query = kafkaDf.selectExpr("CAST(value AS STRING)") \
.writeStream \
.format("console") \
.option("truncate", "false") \
.start()
23/10/14 05:10:02 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-11440d35-73c2-4501-9d97-d1e0da7584fb. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
------------------------------------------- Batch: 0 ------------------------------------------- +-----+ |value| +-----+ +-----+
- HDFS에 stream 데이터 저장하기
In [3]:
# Write stream - HDFS
# Update 해보기
query2 = kafkaDf.selectExpr("CAST(value AS STRING)") \
.writeStream \
.format("parquet") \
.outputMode("append") \
.option("checkpointLocation", "/check/log") \
.option("path", f"hdfs://hdfs-public-ip:9000/test/log_dir/{current_date}") \
.start()
------------------------------------------- Batch: 1 ------------------------------------------- +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |value | +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |{"userid": "c71729ab-1e5a-4960-80d1-22b64603faec", "ip": "178.235.138.39", "log_time": "2023-10-13 17:48:35+0900", "method": "GET", "status": "200", "url": "http://donaldson.com/mainlogin.html", "agent": "Mozilla/5.0 (Android 4.3.1; Mobile; rv:54.0) Gecko/54.0 Firefox/54.0\n"} | |{"userid": "5cbca5d7-c906-4ced-8d76-202508588ca1", "ip": "16.20.232.102", "log_time": "2023-10-13 17:49:50+0900", "method": "GET", "status": "200", "url": "http://house.biz/app/tag/listfaq.asp", "agent": "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_9_6; rv:1.9.3.20) Gecko/5065-12-03 03:08:15 Firefox/3.8\n"} | |{"userid": "4b250aa5-ad07-4c17-9449-4a2cb41d49e6", "ip": "40.152.183.248", "log_time": "2023-10-13 17:50:11+0900", "method": "GET", "status": "200", "url": "http://www.dunn.org/category/blog/categorypost.htm", "agent": "Mozilla/5.0 (X11; Linux i686; rv:1.9.6.20) Gecko/5611-10-02 01:42:57 Firefox/3.6.11\n"} | |{"userid": "391707ad-b6a1-4a41-b153-76912c1c922d", "ip": "106.52.155.215", "log_time": "2023-10-13 17:50:43+0900", "method": "PUT", "status": "200", "url": "https://www.mitchell.com/tags/list/tagscategory.html", "agent": "Opera/9.98.(X11; Linux x86_64; se-NO) Presto/2.9.163 Version/11.00\n"} | |{"userid": "8134e3f7-c976-4dcd-ae08-baa961ff5794", "ip": "24.136.195.53", "log_time": "2023-10-13 17:51:00+0900", "method": "PUT", "status": "200", "url": "https://carter-moore.com/blog/search/postsmain.php", "agent": "Mozilla/5.0 (Android 3.2.6; Mobile; rv:43.0) Gecko/43.0 Firefox/43.0\n"} | |{"userid": "b3a54693-cb44-4021-bf73-e1c48a291ea2", "ip": "109.173.103.184", "log_time": "2023-10-13 17:51:15+0900", "method": "DELETE", "status": "200", "url": "http://www.bridges.com/wp-content/tagsearch.html", "agent": "Opera/8.26.(Windows CE; dz-BT) Presto/2.9.187 Version/10.00\n"} | |{"userid": "ef733be6-f205-4933-9425-dbfa29e495e1", "ip": "94.90.183.137", "log_time": "2023-10-13 17:52:52+0900", "method": "DELETE", "status": "200", "url": "https://www.howard.net/posts/postsprivacy.html", "agent": "Mozilla/5.0 (Windows NT 5.01) AppleWebKit/533.1 (KHTML, like Gecko) Chrome/39.0.807.0 Safari/533.1\n"} | |{"userid": "31c6168a-459a-4cc8-a858-3c5c53382a19", "ip": "212.22.233.216", "log_time": "2023-10-13 17:53:48+0900", "method": "DELETE", "status": "200", "url": "http://www.jackson.com/explore/app/blogpost.html", "agent": "Mozilla/5.0 (iPad; CPU iPad OS 10_3_4 like Mac OS X) AppleWebKit/532.0 (KHTML, like Gecko) FxiOS/15.1l9388.0 Mobile/12T011 Safari/532.0\n"} | |{"userid": "a4282762-266c-4e84-ba2e-8d84b8bb7074", "ip": "189.101.132.173", "log_time": "2023-10-13 17:54:44+0900", "method": "GET", "status": "200", "url": "https://anderson.com/tags/app/categoryfaq.asp", "agent": "Opera/8.66.(X11; Linux x86_64; br-FR) Presto/2.9.167 Version/10.00\n"} | |{"userid": "7ac5cc49-ab82-4f4d-9007-8512045c348b", "ip": "202.68.242.42", "log_time": "2023-10-13 17:55:47+0900", "method": "GET", "status": "200", "url": "https://rose-fowler.com/tag/listhome.php", "agent": "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_5_0; rv:1.9.5.20) Gecko/5468-02-23 03:03:26 Firefox/3.6.7\n"} | |{"userid": "9122c85b-d1a6-425b-8651-3df7e43f39cd", "ip": "163.217.56.242", "log_time": "2023-10-13 17:57:02+0900", "method": "POST", "status": "301", "url": "https://stewart.biz/category/posts/categorypost.html", "agent": "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_5_3; rv:1.9.4.20) Gecko/7415-11-13 08:30:24 Firefox/3.6.18\n"} | |{"userid": "4382229f-863a-47f6-b119-d81510b09c2f", "ip": "90.41.207.236", "log_time": "2023-10-13 17:57:48+0900", "method": "GET", "status": "200", "url": "http://www.harris-peterson.org/search/search/searchterms.php", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_4 rv:4.0; csb-PL) AppleWebKit/534.28.7 (KHTML, like Gecko) Version/5.0 Safari/534.28.7\n"} | |{"userid": "320eeae8-66c4-4897-9840-948aacd848ad", "ip": "159.154.60.15", "log_time": "2023-10-13 17:58:24+0900", "method": "DELETE", "status": "200", "url": "https://www.sanchez.com/category/wp-content/tagscategory.htm", "agent": "Mozilla/5.0 (iPad; CPU iPad OS 5_1_1 like Mac OS X) AppleWebKit/535.1 (KHTML, like Gecko) CriOS/40.0.894.0 Mobile/29K791 Safari/535.1\n"}| |{"userid": "eb6dd1c7-e75a-4ebe-bb66-3176b5360da5", "ip": "155.38.99.164", "log_time": "2023-10-13 17:59:53+0900", "method": "GET", "status": "200", "url": "https://owens.net/wp-content/blogterms.htm", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_6_2; rv:1.9.4.20) Gecko/8365-08-25 13:05:31 Firefox/3.6.16\n"} | |{"userid": "8ff7211e-b184-4d8d-bd10-2d189a09e72c", "ip": "1.231.51.254", "log_time": "2023-10-13 18:00:39+0900", "method": "GET", "status": "200", "url": "https://taylor-jones.com/categorieshome.php", "agent": "Mozilla/5.0 (Linux; Android 5.0.2) AppleWebKit/532.1 (KHTML, like Gecko) Chrome/32.0.806.0 Safari/532.1\n"} | |{"userid": "7daffa65-3902-42b9-a307-bea98ae62453", "ip": "119.26.224.15", "log_time": "2023-10-13 18:00:46+0900", "method": "DELETE", "status": "200", "url": "http://www.miles.com/tagprivacy.asp", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3; rv:1.9.5.20) Gecko/9965-06-11 00:59:40 Firefox/3.8\n"} | |{"userid": "2fb75121-c435-49b0-bd86-457f97163c29", "ip": "146.194.244.219", "log_time": "2023-10-13 18:01:35+0900", "method": "GET", "status": "200", "url": "https://garcia-knight.biz/listauthor.html", "agent": "Mozilla/5.0 (Windows NT 5.01) AppleWebKit/533.2 (KHTML, like Gecko) Chrome/61.0.818.0 Safari/533.2\n"} | |{"userid": "713481da-c3e6-4261-86b2-d2bec1723efb", "ip": "91.122.251.30", "log_time": "2023-10-13 18:03:11+0900", "method": "GET", "status": "200", "url": "https://ramsey.com/categories/mainsearch.php", "agent": "Mozilla/5.0 (Linux; Android 4.0) AppleWebKit/536.1 (KHTML, like Gecko) Chrome/27.0.840.0 Safari/536.1\n"} | |{"userid": "b09ef45e-0791-4c89-ac59-5500b2f9c20f", "ip": "35.47.127.24", "log_time": "2023-10-13 18:04:39+0900", "method": "GET", "status": "200", "url": "https://hopkins.info/explorehomepage.html", "agent": "Opera/8.12.(X11; Linux i686; nn-NO) Presto/2.9.165 Version/11.00\n"} | |{"userid": "726840ba-4485-4e59-b25a-19614ac22559", "ip": "13.240.121.20", "log_time": "2023-10-13 18:04:43+0900", "method": "GET", "status": "200", "url": "http://kennedy-petty.com/explore/wp-content/blogregister.asp", "agent": "Mozilla/5.0 (X11; Linux i686; rv:1.9.5.20) Gecko/3530-03-19 05:03:23 Firefox/3.6.19\n"} | +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ only showing top 20 rows ------------------------------------------- Batch: 2 ------------------------------------------- +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |value | +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |{"userid": "56719ef3-6616-4471-ad1f-1ef65df14572", "ip": "7.210.236.222", "log_time": "2023-10-13 21:20:52+0900", "method": "PUT", "status": "200", "url": "http://jones.biz/main/listmain.html", "agent": "Mozilla/5.0 (X11; Linux x86_64; rv:1.9.5.20) Gecko/3851-05-01 01:42:23 Firefox/3.8\n"} | |{"userid": "15a93df9-a6c3-4ea5-be98-450dcc106dea", "ip": "9.183.254.37", "log_time": "2023-10-13 21:20:55+0900", "method": "GET", "status": "200", "url": "https://www.welch.com/posts/appmain.php", "agent": "Mozilla/5.0 (Android 3.2.2; Mobile; rv:11.0) Gecko/11.0 Firefox/11.0\n"} | |{"userid": "f68974ff-5b50-46f7-bb45-528f9c6611a3", "ip": "48.29.236.200", "log_time": "2023-10-13 21:21:49+0900", "method": "POST", "status": "200", "url": "https://www.greene-hudson.biz/tag/searchhome.php", "agent": "Mozilla/5.0 (Windows; U; Windows 98; Win 9x 4.90) AppleWebKit/534.9.1 (KHTML, like Gecko) Version/5.1 Safari/534.9.1\n"} | |{"userid": "f8fadf52-e58e-46a4-aecf-270f6cda8ef6", "ip": "154.152.253.230", "log_time": "2023-10-13 21:22:09+0900", "method": "GET", "status": "200", "url": "http://www.craig-bates.com/wp-contentmain.php", "agent": "Mozilla/5.0 (Windows CE; br-FR; rv:1.9.2.20) Gecko/6770-05-31 11:52:30 Firefox/8.0\n"} | |{"userid": "085238c7-08af-4d9f-8e0c-b0037f4a4739", "ip": "167.151.230.110", "log_time": "2023-10-13 21:23:44+0900", "method": "GET", "status": "200", "url": "http://www.rojas.com/appabout.php", "agent": "Mozilla/5.0 (Windows 95; ml-IN; rv:1.9.2.20) Gecko/7495-09-04 12:56:44 Firefox/3.6.3\n"} | |{"userid": "cd0519fb-8e6a-4e62-884e-02541cf42779", "ip": "120.75.86.95", "log_time": "2023-10-13 21:25:23+0900", "method": "DELETE", "status": "200", "url": "https://murphy-smith.com/wp-contenthome.html", "agent": "Mozilla/5.0 (Android 7.1.1; Mobile; rv:19.0) Gecko/19.0 Firefox/19.0\n"} | |{"userid": "f9aef282-5c45-4bc2-ba89-f5534fddb834", "ip": "187.34.178.18", "log_time": "2023-10-13 21:26:21+0900", "method": "GET", "status": "200", "url": "http://www.dudley.com/tag/listprivacy.html", "agent": "Mozilla/5.0 (Linux; Android 3.2.1) AppleWebKit/534.0 (KHTML, like Gecko) Chrome/35.0.898.0 Safari/534.0\n"} | |{"userid": "419df0cd-939c-425c-a443-336485cabe3e", "ip": "117.144.249.247", "log_time": "2023-10-13 21:26:44+0900", "method": "GET", "status": "200", "url": "http://www.barrett.biz/list/categoriesregister.php", "agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2; rv:1.9.6.20) Gecko/7884-11-03 21:11:09 Firefox/6.0\n"} | |{"userid": "ddf699f4-cb1c-4784-aafe-64ee619caf2b", "ip": "159.28.61.253", "log_time": "2023-10-13 21:27:24+0900", "method": "GET", "status": "200", "url": "https://www.price.biz/search/categoriespost.html", "agent": "Mozilla/5.0 (Android 4.0.3; Mobile; rv:53.0) Gecko/53.0 Firefox/53.0\n"} | |{"userid": "551464fd-29ba-40bb-b5d0-d873ad3bce5b", "ip": "113.114.227.59", "log_time": "2023-10-13 21:28:15+0900", "method": "GET", "status": "301", "url": "http://gonzalez.com/maincategory.htm", "agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 12_4_8 like Mac OS X) AppleWebKit/535.1 (KHTML, like Gecko) FxiOS/13.9q3194.0 Mobile/07T422 Safari/535.1\n"} | |{"userid": "72b6a28f-98e6-4738-a4a5-a56e3ac87450", "ip": "199.217.164.239", "log_time": "2023-10-13 21:28:35+0900", "method": "GET", "status": "200", "url": "http://long.com/searchindex.php", "agent": "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_10_5; rv:1.9.3.20) Gecko/8346-10-17 02:06:41 Firefox/4.0\n"} | |{"userid": "eb89f66d-b5a6-41f4-9d87-72a11007c05e", "ip": "214.12.236.254", "log_time": "2023-10-13 21:28:56+0900", "method": "GET", "status": "200", "url": "https://www.sawyer.com/explore/app/wp-contentauthor.php", "agent": "Mozilla/5.0 (Windows NT 4.0) AppleWebKit/532.1 (KHTML, like Gecko) Chrome/38.0.884.0 Safari/532.1\n"} | |{"userid": "57d81519-408d-4741-9a25-3e7956fb8c94", "ip": "196.215.74.181", "log_time": "2023-10-13 21:29:16+0900", "method": "GET", "status": "200", "url": "http://www.kennedy-vargas.com/wp-content/categories/postsindex.html", "agent": "Mozilla/5.0 (Macintosh; PPC Mac OS X 10_11_5) AppleWebKit/533.1 (KHTML, like Gecko) Chrome/29.0.895.0 Safari/533.1\n"}| |{"userid": "f5619770-75d1-4e5c-aeb9-0af637b3ba76", "ip": "173.97.200.109", "log_time": "2023-10-13 21:30:33+0900", "method": "GET", "status": "200", "url": "http://hudson-rodriguez.com/categorypost.php", "agent": "Mozilla/5.0 (X11; Linux i686; rv:1.9.7.20) Gecko/3631-02-28 07:48:41 Firefox/3.8\n"} | |{"userid": "c55d811c-f183-48b3-8a9d-ea71c8e02685", "ip": "100.175.115.71", "log_time": "2023-10-13 21:31:17+0900", "method": "GET", "status": "200", "url": "https://barnes.com/wp-content/blogmain.html", "agent": "Mozilla/5.0 (iPad; CPU iPad OS 10_3_3 like Mac OS X) AppleWebKit/534.2 (KHTML, like Gecko) FxiOS/13.6z0288.0 Mobile/87D690 Safari/534.2\n"} | |{"userid": "dac45734-4d59-4563-9d82-bff536920162", "ip": "208.142.89.31", "log_time": "2023-10-13 21:32:14+0900", "method": "DELETE", "status": "200", "url": "https://owen.com/tag/blog/listfaq.htm", "agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 6.2; Trident/5.1)\n"} | |{"userid": "9c42bda1-2a04-4f31-bffe-4e9c09461f3d", "ip": "130.244.131.85", "log_time": "2023-10-13 21:32:40+0900", "method": "GET", "status": "200", "url": "https://lawson-ayers.biz/app/tagspost.asp", "agent": "Opera/8.71.(X11; Linux x86_64; ro-RO) Presto/2.9.162 Version/12.00\n"} | |{"userid": "b3cc32a1-3d31-49dd-9e29-75198d7ced43", "ip": "51.201.99.170", "log_time": "2023-10-13 21:34:19+0900", "method": "GET", "status": "200", "url": "http://johnson.com/list/tagpost.php", "agent": "Mozilla/5.0 (Linux; Android 8.0.0) AppleWebKit/534.2 (KHTML, like Gecko) Chrome/31.0.886.0 Safari/534.2\n"} | |{"userid": "0341afc1-2373-4133-8c3d-e4364515c173", "ip": "180.209.168.235", "log_time": "2023-10-13 21:35:55+0900", "method": "POST", "status": "200", "url": "https://www.miller-barker.com/wp-contentfaq.html", "agent": "Mozilla/5.0 (Macintosh; U; PPC Mac OS X 10_6_4; rv:1.9.3.20) Gecko/3297-07-06 17:01:26 Firefox/3.8\n"} | |{"userid": "e7eb6bdc-24fa-47a2-be0a-717c85c4ccf7", "ip": "63.95.6.31", "log_time": "2023-10-13 21:36:30+0900", "method": "GET", "status": "200", "url": "http://frazier.info/categoriesindex.php", "agent": "Mozilla/5.0 (Android 2.3.3; Mobile; rv:54.0) Gecko/54.0 Firefox/54.0\n"} | +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ only showing top 20 rows
In [4]:
print(kafkaDf.printSchema())
root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true) None
In [ ]:
query.awaitTermination()
query2.awaitTermination()
In [ ]:
spark.stop()
- HDFS 로그 데이터 Dataframe으로 처리하여 읽기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType
class ConnectHadoop:
def __init__(self, ip="ip", port="port", file_path="", file_name="", user_name="kim"):
self.hdfs_ip = ip
self.hdfs_port = port
self.hdfs_server = f"hdfs://{self.hdfs_ip}:{self.hdfs_port}"
self.hdfs_file_path = file_path
self.hdfs_file_name = file_name
self.hdfs_user_name = user_name
self.spark = SparkSession.builder \
.appName("WriteHDFS") \
.config("spark.hadoop.fs.defaultFS", self.hdfs_server) \
.config("spark.hadoop.yarn.resourcemanager.hostname", self.hdfs_ip) \
.config("spark.hadoop.user.name", self.hdfs_user_name) \
.getOrCreate()
def __del__(self):
print('end spark')
def read_file(self, read_type):
if read_type == "txt":
return self.spark.read.text(self.hdfs_file_path + self.hdfs_file_name)
elif read_type == "parquet":
return self.spark.read.parquet(self.hdfs_file_path)
elif read_type == "csv":
return self.spark.read.csv(self.hdfs_file_path)
def write_file(self, df, write_type):
try:
if write_type == "parquet":
df.write.format("parquet").mode("overwrite").save(self.hdfs_file_path)
elif write_type == "csv":
df.coalesce(2).write.mode('overwrite').option('header', 'true').csv(self.hdfs_file_path)
elif write_type == "json":
df.write.format("json").mode("overwrite").save(hadoop.hdfs_file_path)
except Exception as e:
print(f'Exception 발생 : {e} , 시간 : {datetime.now()}')
if __name__ == "__main__":
current_date = datetime.now().strftime("%Y-%m-%d")
json_schema = StructType(
[StructField("userid", StringType(), True),
StructField("ip", StringType(), True),
StructField("log_time", StringType(), True),
StructField("method", StringType(), True),
StructField("status", StringType(), True),
StructField("url", StringType(), True),
StructField("agent", StringType(), True)
])
hadoop = ConnectHadoop(file_path=f"/test/log_dir/{current_date}")
df = hadoop.read_file(read_type="parquet")
parsed_list = ["parsed_value.userid", "parsed_value.ip", "parsed_value.log_time", "parsed_value.method",
"parsed_value.status", "parsed_value.url", "parsed_value.agent"]
parsed_df = df.select(from_json(col("value"), json_schema).alias("parsed_value"))
final_df = parsed_df.select(*parsed_list)
final_df.show()
# 전체 출력
# final_df.show(final_df.count(), truncate=False)