Kimuksung
Kimuksung 안녕하세요. 분산처리에 관심이 많은 생각하는 주니어 Data Enginner입니다.

Spark - Kafka 연동하기

Spark - Kafka 연동하기

Aws Instance로 Kafka클러스터를 구성한 뒤, Docker 위에 Spark를 구성하여 Hadoop으로 전송하려고 합니다.

구성 중에 Spark와 Kafka 연동이 되지 않아 해결 방안을 찾고 있습니다.

  • jar 파일로 구성 - 시도(실패) 링크
  • pyspark —packages로 구성 - 시도(실패) 링크
  • spark config jupyter로 구성 - 시도(성공)
  • dependency 추가(이해 못함) - 링크

스파크 버전은 3.1.1이며, scala 버전 = 2.12, Kafka = 3.0.0 입니다.

Jupyter에서 pyspark로 Kafka 연결하는 방법입니다.

결론은 conf에 spark.jars.packages로 구성하여 연동은 가능하나, local에 저장하여 구성하는 spark.jars로는 실패하였습니다. 차후에 시간이 되면 해당 부분 업데이트 하겠습니다.

Jar 파일 구성

  • 링크에서 본인의 버전에 맞게 다운로드하여 줍니다. version이 sparkversion을 의미합니다.
    • spark-sql-kafka-0-10_2.12-3.1.1.jar
    • kafka-clients 3.2.0.jar
    • spark-token-provider-kafka-0-10_2.12.3.1.1.jar
    • commons_pools:2.6.2.jar
  • Jupyter에 Jar 파일 구성
    • stackoverflow를 보면 jars 파일로는 dependecncy로 불가하다는데,, 이유를 모르겠습니다.
    • packages에서 성공한 jars를 가지고 구성해보았습니다.
    • https://stackoverflow.com/questions/46076771/how-to-submit-multiple-jars-to-workers-through-sparksession/46076881#46076881

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      
        from pyspark.sql import SparkSession
                  
        jars = [
                "kafka-clients-3.2.0.jar", 
                "spark-sql-kafka-0-10_2.12-3.1.1.jar"
               ]
        jar_files = ",".join(list(map(lambda x:"/opt/workspace/spark_test/jars/kafka/"+x, jars)))
              
        spark = SparkSession.builder\
           .master("local")\
           .appName("Kafka Connection test")\
           .config("spark.jars", jar_files)\
           .getOrCreate()
              
        spark
      
      1
      2
      3
      
        # upload된 파일 확인하기
        # https://stackoverflow.com/questions/57057648/list-all-additional-jars-loaded-in-pyspark
        print(spark.sparkContext._jsc.sc().listJars())
      
  • Spark Node
    • SPARK_HOME/jars 다운로드
    1
    2
    
      $ cd $SPARK_HOME/jars
      $ wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.1.1/spark-sql-kafka-0-10_2.12-3.1.1.jar
    

    Untitled-38

    Untitled

Pyspark option 주기

  • docker에 직접 접속하여 command option으로 날려보았습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
$ docker -exec -it spark-master /bin/bash
$ docker exec -it spark-master /bin/bash
$ pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2

spark.sparkContext.getConf().getAll()
> [('spark.app.id', 'local-1696780510280'), ('spark.app.startTime', '1696780509674'), ('spark.submit.pyFiles', '/root/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.1.2.jar,/root/.ivy2/jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.1.2.jar,/root/.ivy2/jars/org.apache.kafka_kafka-clients-2.6.0.jar,/root/.ivy2/jars/org.apache.commons_commons-pool2-2.6.2.jar,/root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,/root/.ivy2/jars/com.github.luben_zstd-jni-1.4.8-1.jar,/root/.ivy2/jars/org.lz4_lz4-java-1.7.1.jar,/root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,/root/.ivy2/jars/org.slf4j_slf4j-api-1.7.30.jar'), ('spark.app.initial.file.urls', 'file:///root/.ivy2/jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///root/.ivy2/jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///root/.ivy2/jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:///root/.ivy2/jars/org.lz4_lz4-java-1.7.1.jar,file:///root/.ivy2/jars/org.slf4j_slf4j-api-1.7.30.jar,file:///root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///root/.ivy2/jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.1.2.jar,file:///root/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.1.2.jar'), ('spark.sql.warehouse.dir', 'file:/usr/bin/spark-3.1.1-bin-hadoop2.7/spark-warehouse'), ('spark.jars', 'file:///root/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.1.2.jar,file:///root/.ivy2/jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.1.2.jar,file:///root/.ivy2/jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///root/.ivy2/jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:///root/.ivy2/jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///root/.ivy2/jars/org.lz4_lz4-java-1.7.1.jar,file:///root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///root/.ivy2/jars/org.slf4j_slf4j-api-1.7.30.jar'), ('spark.driver.port', '41769'), ('spark.executor.id', 'driver'), ('spark.app.name', 'PySparkShell'), ('spark.app.initial.jar.urls', 'spark://8949e9a73a84:41769/jars/org.lz4_lz4-java-1.7.1.jar,spark://8949e9a73a84:41769/jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.1.2.jar,spark://8949e9a73a84:41769/jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.1.2.jar,spark://8949e9a73a84:41769/jars/org.apache.kafka_kafka-clients-2.6.0.jar,spark://8949e9a73a84:41769/jars/org.spark-project.spark_unused-1.0.0.jar,spark://8949e9a73a84:41769/jars/org.apache.commons_commons-pool2-2.6.2.jar,spark://8949e9a73a84:41769/jars/com.github.luben_zstd-jni-1.4.8-1.jar,spark://8949e9a73a84:41769/jars/org.slf4j_slf4j-api-1.7.30.jar,spark://8949e9a73a84:41769/jars/org.xerial.snappy_snappy-java-1.1.8.2.jar'), ('spark.sql.catalogImplementation', 'hive'), ('spark.rdd.compress', 'True'), ('spark.files', 'file:///root/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.1.2.jar,file:///root/.ivy2/jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.1.2.jar,file:///root/.ivy2/jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///root/.ivy2/jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:///root/.ivy2/jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///root/.ivy2/jars/org.lz4_lz4-java-1.7.1.jar,file:///root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///root/.ivy2/jars/org.slf4j_slf4j-api-1.7.30.jar'), ('spark.serializer.objectStreamReset', '100'), ('spark.master', 'local[*]'), ('spark.repl.local.jars', 'file:///root/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.1.2.jar,file:///root/.ivy2/jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.1.2.jar,file:///root/.ivy2/jars/org.apache.kafka_kafka-clients-2.6.0.jar,file:///root/.ivy2/jars/org.apache.commons_commons-pool2-2.6.2.jar,file:///root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:///root/.ivy2/jars/com.github.luben_zstd-jni-1.4.8-1.jar,file:///root/.ivy2/jars/org.lz4_lz4-java-1.7.1.jar,file:///root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.8.2.jar,file:///root/.ivy2/jars/org.slf4j_slf4j-api-1.7.30.jar'), ('spark.submit.deployMode', 'client'), ('spark.driver.host', '8949e9a73a84'), ('spark.ui.showConsoleProgress', 'true')]

df = spark\
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", ":9092") \
  .option("subscribe", "test") \
  .load()

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/bin/spark-3.1.1-bin-hadoop2.7/python/pyspark/sql/streaming.py", line 482, in load
    return self._df(self._jreader.load())
  File "/usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/usr/bin/spark-3.1.1-bin-hadoop2.7/python/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.9/dist-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o160.load.
: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer
	at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:557)
	at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala)
	at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:326)
	at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:235)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:116)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:116)
	at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33)
	at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:220)
	at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:195)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 21 more

Untitled-39

Spark Conf로 구성하기

  • conf의 spark.jars.packages에 요청하고자 하는 값을 넣어줍니다.
  • https://github.com/OneCricketeer/docker-stacks/blob/master/hadoop-spark/spark-notebooks/kafka-sql.ipynb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from pyspark.sql import SparkSession

scala_version = '2.12' 
spark_version = '3.1.1'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.2.0'
]
spark = SparkSession.builder\
   .master("local")\
   .appName("kafka-example")\
   .config("spark.jars.packages", ",".join(packages))\
   .getOrCreate()
spark

Untitled-40

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# batch
# https://github.com/OneCricketeer/docker-stacks/blob/master/hadoop-spark/spark-notebooks/kafka-sql.ipynb
from pyspark.sql.functions import col, concat, lit

kafkaDf = spark.read.format("kafka")\
  .option("kafka.bootstrap.servers", "kafkaip:9092")\
  .option("subscribe", 'test')\
  .option("startingOffsets", "earliest")\
  .load()

kafkaDf.select(
    concat(col("topic"), lit(':'), col("partition").cast("string")).alias("topic_partition"),
    col("offset"),
    col("value").cast("string")
).show()

Untitled-41

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# stream
from pyspark.sql.functions import col, concat, lit
# Read stream
log = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "kafkaip:9092") \
.option("subscribe", "test") \
.option("startingOffsets", "earliest") \
.load()

query = log.selectExpr("CAST(value AS STRING)") \
.writeStream \
.format("console") \
.option("truncate", "false") \
.start()

query.awaitTermination()

Untitled-41

참조

  • https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
  • https://taaewoo.tistory.com/78?category=926385
  • https://data-engineer-tech.tistory.com/46
  • https://velog.io/@statco19/pyspark-kafka-streaming
  • https://mj-sunflower.tistory.com/52