1. docker-compose.yml 파일을 생성하고 작성
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.5.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Docker Compose 버전을 확인하고
docker-compose --version
혹시 docker-compose 명령어가 없다는 메시지가 뜨면 아래 명령어로 설치를 합니다.
brew install docker-compose
2. 아래 명령어를 터미널에서 실행합니다.
이미지를 다운로드 받아서 컨테이너 생성
클러스터 1개를 가진 카프카 서버를 실행
docker compose up -d
그런데 저는 위 사진처럼 Zookeeper가 실행되지 않고 Java 관련 오류(Segmentation Fault)가 계속 발생하였습니다. M1/M2 칩셋을 사용하는 Mac (ARM64 기반)에서 자주 발생하는 호환성 문제였습니다.
Zookeeper와 Kafka 이미지가 현재 시스템의 아키텍처와 호환되지 않는 경우 발생하는 에러라고 하여 ARM64와 호환되는 다른 Zookeeper 이미지를 사용하였습니다. 공식 Zookeeper 이미지는 다양한 플랫폼을 지원하고 있어서 아래처럼 호환되는 다른 이미지를 사용하였습니다.
docker run -d --platform linux/arm64 --name zookeeper --network kafka-network -p 2181:2181 zookeeper
만약, 이미 Zookeeper 컨테이너가 존재하여 새 컨테이너를 실행할 수 없다는 오류 메시지가 뜬다면 기존 Zookeeper 컨테이너를 삭제한 후 다시 시도해야 합니다.
//기존 컨테이너 제거
docker rm -f zookeeper
// Zookeeper 컨테이너 재실행
docker run -d --platform linux/arm64 --name zookeeper --network kafka-network -p 2181:2181 zookeeper
3. 외부에서 사용할 수 있도록 설정을 변경
터미널에서 도커 컨테이너 안으로 접속
docker exec -it kafka /bin/bash
vi /opt/kafka/config/server.properties
설정 파일을 수정 - 내용을 추가
delete.topic.enable=true
auto.create.topics.enable=true
4. 토픽 생성 과 조회 및 삭제
명령어를 사용하기 위해서 프롬프트 이동
bash# cd /opt/kafka/bin
첫번째 카프카 서버의 첫번째 영역에 토픽(exam-topic) 생성
bash# kafka-topics.sh --bootstrap-server localhost:9092 --list
토픽 삭제:
bash# kafka-topics.sh --delete --zookeeper zookeeper:2181 --topic exam-topic
5. 메세지 전송 및 받기
메시지 전송
터미널에서 도커 컨테이너의 쉘에 접속
docker exec -it kafka /bin/bash
디렉토리 이동
bash# cd /opt/kafka/bin
게시 생성
bash-5.1# kafka-console-producer.sh --topic exam-topic --broker-list localhost:9092
> 메시지 작성
메시지 구독
새로운 터미널을 열어서 메시지를 받아보겠습니다.
동일하게 도커 컨테이너의 쉘에 접속합니다.
docker exec -it kafka /bin/bash bash
디렉토리 이동
bash# cd /opt/kafka/bin
구독 생성
bash-5.1# kafka-console-consumer.sh --topic exam-topic --bootstrapserver localhost:9092 --from-beginning
6. Python에서 카프카 메세지 전송 및 받기 확인
파이썬 가상 환경을 생성(Mac 이나 Linux는 pythone 대신에 python3)
python3 -m venv kafka_env
가상환경 활성화
source kafka_env/bin/activate
패키지 설치
pip install kafka-python
pip install six==1.6.0
pip install six==1.6.0
메시지 전송하는 코드를 작성하고 실행 한 후 터미널을 확인
producer.py
import sys
import six
if sys.version_info >= (3, 12, 0):
sys.modules['kafka.vendor.six.moves'] = six.moves
from kafka import KafkaProducer
import json
class MessageProducer:
def __init__(self, broker, topic):
self.broker = broker
self.topic = topic
#key_serializer=str.encode 를 추가하면 key 와 함께 전송
#그렇지 않으면 value 만 전송
self.producer = KafkaProducer(
bootstrap_servers=self.broker,
value_serializer=lambda x: json.dumps(x).encode("utf-8"),
acks=0,
api_version=(2, 5, 0),
key_serializer=str.encode,
retries=3,
)
def send_message(self, msg, auto_close=True):
try:
print(self.producer)
future = self.producer.send(self.topic, value=msg, key="key")
self.producer.flush() # 비우는 작업
if auto_close:
self.producer.close()
future.get(timeout=2)
return {"status_code": 200, "error": None}
except Exception as exc:
raise exc
# 브로커와 토픽명을 지정
broker = ["localhost:9092"]
topic = "exam-topic"
pd = MessageProducer(broker, topic)
#전송할 메시지 생성
msg = {"name": "John", "age": 30}
res = pd.send_message(msg)
print(res)
터미널에서 파일 실행
python3 producer.py
consumer.py
import sys
import six
if sys.version_info >= (3, 12, 0):
sys.modules['kafka.vendor.six.moves'] = six.moves
from kafka import KafkaConsumer
import json
class MessageConsumer:
def __init__(self, broker, topic):
self.broker = broker
self.consumer = KafkaConsumer(
topic, # Topic to consume
bootstrap_servers=self.broker,
value_deserializer=lambda x: x.decode(
"utf-8"
), # Decode message value as utf-8
group_id="my-group", # Consumer group ID
auto_offset_reset="earliest", # Start consuming from earliest available message
enable_auto_commit=True, # Commit offsets automatically
)
def receive_message(self):
try:
for message in self.consumer:
#print(message.value)
result = json.loads(message.value)
for k, v in result.items():
print(k, ":", result[k])
print(result["name"])
print(result["age"])
except Exception as exc:
raise exc
# 브로커와 토픽명을 지정한다.
broker = ["localhost:9092"]
topic = "exam-topic"
cs = MessageConsumer(broker, topic)
cs.receive_message()
'BackEnd > JAVA SPRING' 카테고리의 다른 글
[Spring Boot] Docker + Github Actions CI/CD 구축 (7) | 2024.10.21 |
---|---|
[Spring Boot] Apache Kafka 연동 (0) | 2024.10.08 |