Kafka Cluster 클러스터 구성하기 with Ec2
가상의 대용량의 로그 데이터를 만들어 Kafka → Hadoop에 저장해보려고 합니다.
EC2 인스턴스를 구성 및 접속하는 방법 다른 글에도 많이 나오기에 생략하겠습니다.
카프카 클러스터 구성을 시작해보겠습니다.
기본 host 설정
- 인스턴스에 접속하여 이름을 변경하여 줍니다.
- Cluster 정보를 입력하여 줍니다.
1
2
3
4
5
6
7
$ hostname
$ sudo hostnamectl set-hostname kafka1
$ sudo vi /etc/hosts
127.0.0.1 localhost
0.0.0.0 kafka1
13.1.1.1 kafka2
java 설치
- kafka는 java기반으로 동작하기에 java 11을 사용할 예정입니다.
- 모든 인스턴스에 적용하여 줍니다.
1
2
3
4
5
6
7
8
9
$ sudo apt-get update
$ sudo apt-get upgrade
$ sudo apt-get install openjdk-11-jdk
# check install
$ java -version
# delete java
$ sudo apt-get purge openjdk*
- 환경 설정을 적용하여 줍니다.
1
2
3
4
5
6
7
8
9
# 환경 설정
sudo vi ~/.bashrc
#JAVA setting
export JAVA_HOME=$(dirname $(dirname $(readlink -f $(which java))))
export PATH=$PATH:$JAVA_HOME/bin
$ source ~/.bashrc
$ echo $JAVA_HOME
Zookeeper 설치
- Kafka 클러스터 관리를 지원하여 주는 Zookeeper를 설치할 예정입니다.
- 본인 서버 정보에 맞추어 구성하여 줍니다.
/var/lib/zookeeper/myid
각 인스턴스 별로 unique한 정수 값을 넣어줍니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz
$ tar xvf zookeeper-3.4.12.tar.gz
$ cd zookeeper-3.4.12/conf
$ sudo vi zoo.cfg
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=20
syncLimit=5
server.1=kafka1:2888:3888
server.2=kafka2:2888:3888
server.3=kafka3:2888:3888
$ echo 1 > /var/lib/zookeeper/myid
1
$ sudo ./bin/zkServer.sh start
1
2
3
4
$ sudo vi ~/.bashrc
$ source ~/.bashrc
alias start_zookeeper='/home/ubuntu/zookeeper-3.4.12/bin/zkServer.sh start'
alias stop_zookeeper='/home/ubuntu/zookeeper-3.4.12/bin/zkServer.sh stop'
kafka 설치
- Kafka 버전 별로 이후 동작하는 방식이 다르기에 원하는 버전을 설치하여 주시면 됩니다.
- 저는 3.0.0으로 진행할 예정입니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# version2
$ wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.11-2.1.0.tgz
$ tar xvf kafka_2.11-2.1.0.tgz
# version3
$ wget https://archive.apache.org/dist/kafka/3.0.0/kafka-3.0.0-src.tgz
$ tar xvf kafka-3.0.0-src.tgz
$ sudo vi config/server.properties
# version3에서는 0이 필수로 필요한가봄
broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://kafka1:9092
zookeeper.connect=kafka1:2181,kafka2:2181
# run
$ bin/kafka-server-start.sh -daemon config/server.properties
$ ./gradlew jar -PscalaVersion=2.13.6
# exit
$ bin/kafka-server-stop.sh -daemon config/server.properties
# check
$ cat logs/server.log
$ bin/kafka-topics.sh --version
$ jps
# 버전에 따라서 option이 다른것으로 보인다.
# https://stackoverflow.com/questions/69297020/exception-in-thread-main-joptsimple-unrecognizedoptionexception-zookeeper-is
# version 2.2 이상부터는 zookeeper option 대신 bootstrap-server를 지원
# version 2.2 아래에서는 zookeeper option이 필수
# version2
# create topics
$ bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test
# list topics
$ bin/kafka-topics.sh --list --zookeeper kafka2:2181
# version3
# list topics
$ bin/kafka-topics.sh --bootstrap-server kafka1:9092,kafka2:9092 --list
# describe topics
$ bin/kafka-topics.sh --bootstrap-server kafka1:9092,kafka2:9092 --describe
# create topics
$ bin/kafka-topics.sh --create \
--replication-factor 2 \
--partitions 1 \
--topic test \
--bootstrap-server kafka1:9092,kafka2:9092
# delete topics
$ bin/kafka-topics.sh --bootstrap-server kafka1:9092,kafka2:9092 --delete --topic test
- topic 생성을 해보려고 합니다.
- 2.2버전 아래로는 zookeeper가 필수이기에 option 값으로
zookeeper
를 사용합니다. - 3.0.0 버전이기에
bootstrap-server
옵션으로 topic을 만드어줍니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 버전에 따라서 option이 다른것으로 보인다.
# https://stackoverflow.com/questions/69297020/exception-in-thread-main-joptsimple-unrecognizedoptionexception-zookeeper-is
# version 2.2 이상부터는 zookeeper option 대신 bootstrap-server를 지원
# version 2.2 아래에서는 zookeeper option이 필수
# version2
# create topics
$ bin/kafka-topics.sh --create --zookeeper kafka1:2181 --replication-factor 1 --partitions 1 --topic test
# list topics
$ bin/kafka-topics.sh --list --zookeeper kafka2:2181
# version3
# list topics
$ bin/kafka-topics.sh --bootstrap-server kafka1:9092,kafka2:9092 --list
# describe topics
$ bin/kafka-topics.sh --bootstrap-server kafka1:9092,kafka2:9092 --describe
# create topics
$ bin/kafka-topics.sh --create \
--replication-factor 2 \
--partitions 1 \
--topic test \
--bootstrap-server kafka1:9092,kafka2:9092
# delete topics
$ bin/kafka-topics.sh --bootstrap-server kafka1:9092,kafka2:9092 --delete --topic test
1
2
3
4
$ sudo vi ~/.bashrc
$ source ~/.bashrc
alias start_kafka='/home/ubuntu/kafka-3.0.0-src/bin/kafka-server-start.sh -daemon config/server.properties'
alias stop_kafka='/home/ubuntu/kafka-3.0.0-src/bin/kafka-server-stop.sh -daemon config/server.properties'
produce&consume
- produce와 consume 과정을 아래 커맨드를 친 뒤에 값을 넣어주면 넘어가는 것을 볼 수 있습니다.
- 아래 Python 응로 구성할 것이기 때문에 자세한 내용은 생략합니다.
1
2
3
4
5
# procude
$ bin/kafka-console-producer.sh --topic test --bootstrap-server kafka1:9092,kafka2:9092
# consume
$ bin/kafka-console-consumer.sh --topic test --bootstrap-server kafka1:9092,kafka2:9092
Python에서 Kafka 연결하기
- 외부와 연결하기 위해서는
advertised.host.name
에 aws ip를 넣어주어야 한다. - 찾아보니 아래와 같이 변경되었다고 한다.
- The
advertised.port
andadvertised.host.name
configurations were removed. Please useadvertised.listeners
instead. - https://stackoverflow.com/questions/43565698/connecting-kafka-running-on-ec2-machine-from-my-local-machine
- https://kafka.apache.org/documentation/#brokerconfigs
1
2
3
4
5
6
7
8
9
$ bin/kafka-server-stop.sh -daemon config/server.properties
$ sudo vi config/server.properties
broker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://ip:9092
zookeeper.connect=kafka1:2181,kafka2:2181
$ bin/kafka-server-start.sh -daemon config/server.properties
Python Produce, Consume 구성하기
bootstrap_servers
: 클러스터 정보auto_offset_reset
: Consume 방식- 처음부터 -
earliest
, 현재 이후 부터 consume할지가 결정latest
- 처음부터 -
- https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from kafka import KafkaProducer
from kafka import KafkaConsumer
from json import dumps
import time
class KafkaInfos:
def __init__(self, topic: str, nodes: list):
self.topic = topic
self.bootstrap_servers = nodes
def set_produce(self, values):
try:
producer = KafkaProducer(
bootstrap_servers=self.bootstrap_servers,
api_version=(2, 5, 0),
acks=0,
compression_type='gzip',
value_serializer=lambda x: dumps(x).encode('utf-8')
)
start = time.time()
print('[Produce] - 메시지 전송을 시작합니다.')
for i, val in enumerate(values, 0):
print(f'{i}번 전송중 {val}')
producer.send(self.topic, value=val)
# 비우는 작업
producer.flush()
producer.close()
print(f'[Produce] 걸린시간: {time.time() - start}')
return {"status": 200}
except Exception as exc:
raise exc
def get_consume(self):
try:
consumer = KafkaConsumer(
self.topic,
bootstrap_servers=self.bootstrap_servers,
value_deserializer=lambda x: x.decode(
"utf-8"
),
auto_offset_reset='earliest' #default = latest / earliest = 과거 데이터도 consume
)
print(f'[Consume] - 시작')
for message in consumer:
print(f'Partition : {message.partition}, Offset : {message.offset}, Value : {message.value}')
consumer.close()
except Exception as exc:
raise exc
if __name__ == "__main__":
kafka_connection = KafkaInfos(topic='test', nodes=['public_ip:9092', 'public_ip:9092'])
# produce
messages = ['i am', 'ready', 'to test']
kafka_connection.set_produce(messages)
# consume
kafka_connection.get_consume()
kafka VPC에 맞추어 연결 값 재설정하기
- AWS 구성에 맞추어 내부로 들어오는 로직과 외부에서 오는 로직을 구분
- 테스트 용도로 사용하기 위해 PLAINTEXT로 구성
- https://bagbokman.tistory.com/16
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#before
broker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://ec2ip-ap-northeast-2.compute.amazonaws.com:9092
zookeeper.connect=kafka1:2181,kafka2:2181
# after
broker.id=0
listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
advertised.listeners=INTERNAL://ec2-ip.ap-northeast-2.compute.internal:9092,EXTERNAL://ec2-ip.ap-northeast-2.compute.amazonaws.com:9092
zookeeper.connect=kafka1:2181,kafka2:2181
inter.broker.listener.name=INTERNAL
# after
broker.id=2
listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
advertised.listeners=INTERNAL://ec2-ip.compute.internal:9092,EXTERNAL://ec2ip.ap-northeast-2.compute.amazonaws.com:9092
zookeeper.connect=kafka1:2181,kafka2:2181
inter.broker.listener.name=INTERNAL
참조
- https://amazelimi.tistory.com/entry/Kafka-AWS-EC2에-카프카-클러스터-설치하기-LIM
- https://blog.voidmainvoid.net/325
- https://co-de.tistory.com/44