Notice
Recent Posts
Recent Comments
Link
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | |
| 7 | 8 | 9 | 10 | 11 | 12 | 13 |
| 14 | 15 | 16 | 17 | 18 | 19 | 20 |
| 21 | 22 | 23 | 24 | 25 | 26 | 27 |
| 28 | 29 | 30 | 31 |
Tags
- docker
- devops
- 프로덕션 운영
- Python
- 클라우드
- kubernetes
- 서비스 설계
- 서비스 메시
- 컨테이너오케스트레이션
- 세션저장소
- CI/CD
- 마이크로서비스
- 메시지 브로커
- 보안
- 모니터링
- 분산 모니터링
- Kafka 클러스터
- 모노리스 분해
- 마이크로서비스 운영
- ApacheBench
- 고가용성
- 마이크로서비스 통신
- 분산 시스템
- infrastructureascode
- 이벤트 스트리밍
- rabbitmq
- 메시징 패턴
- 인메모리데이터베이스
- 클러스터
- RabbitMQ Exchange
Archives
- Today
- Total
hobokai 님의 블로그
Kafka 프로덕션 가이드 - 클러스터링, 모니터링, 보안, 운영 자동화 본문
Apache Kafka 완벽 가이드 3편: 클러스터링과 프로덕션 운영
목차
Kafka 클러스터 구성
프로덕션 환경에서는 고가용성과 확장성을 위해 Kafka 클러스터를 구성해야 합니다.
3노드 클러스터 구성
# docker-compose-cluster.yml
version: '3.8'
services:
zookeeper1:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper1
ports:
- "2181:2181"
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: zookeeper1:2888:3888;zookeeper2:2888:3888;zookeeper3:2888:3888
volumes:
- zk1-data:/var/lib/zookeeper/data
- zk1-logs:/var/lib/zookeeper/log
zookeeper2:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper2
ports:
- "2182:2181"
environment:
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: zookeeper1:2888:3888;zookeeper2:2888:3888;zookeeper3:2888:3888
volumes:
- zk2-data:/var/lib/zookeeper/data
- zk2-logs:/var/lib/zookeeper/log
zookeeper3:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper3
ports:
- "2183:2181"
environment:
ZOOKEEPER_SERVER_ID: 3
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ZOOKEEPER_SERVERS: zookeeper1:2888:3888;zookeeper2:2888:3888;zookeeper3:2888:3888
volumes:
- zk3-data:/var/lib/zookeeper/data
- zk3-logs:/var/lib/zookeeper/log
kafka1:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka1
ports:
- "9092:9092"
- "19092:19092"
depends_on:
- zookeeper1
- zookeeper2
- zookeeper3
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 3000
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_JMX_PORT: 19092
KAFKA_JMX_HOSTNAME: localhost
volumes:
- kafka1-data:/var/lib/kafka/data
kafka2:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka2
ports:
- "9093:9092"
- "19093:19092"
depends_on:
- zookeeper1
- zookeeper2
- zookeeper3
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29092,PLAINTEXT_HOST://localhost:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 3000
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_JMX_PORT: 19092
KAFKA_JMX_HOSTNAME: localhost
volumes:
- kafka2-data:/var/lib/kafka/data
kafka3:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka3
ports:
- "9094:9092"
- "19094:19092"
depends_on:
- zookeeper1
- zookeeper2
- zookeeper3
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29092,PLAINTEXT_HOST://localhost:9094
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 3000
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
KAFKA_JMX_PORT: 19092
KAFKA_JMX_HOSTNAME: localhost
volumes:
- kafka3-data:/var/lib/kafka/data
volumes:
zk1-data:
zk1-logs:
zk2-data:
zk2-logs:
zk3-data:
zk3-logs:
kafka1-data:
kafka2-data:
kafka3-data:
클러스터 관리 스크립트
# cluster_manager.py
import subprocess
import json
import time
from kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType
from kafka import KafkaProducer, KafkaConsumer
class KafkaClusterManager:
def __init__(self, bootstrap_servers):
self.bootstrap_servers = bootstrap_servers
self.admin_client = KafkaAdminClient(
bootstrap_servers=bootstrap_servers,
client_id='cluster_manager'
)
def check_cluster_health(self):
"""클러스터 상태 확인"""
try:
# 브로커 메타데이터 확인
metadata = self.admin_client.describe_cluster()
brokers = metadata.brokers
print("🔍 클러스터 상태 확인:")
print(f" 브로커 수: {len(brokers)}")
for broker in brokers:
print(f" 브로커 {broker.id}: {broker.host}:{broker.port}")
# 컨트롤러 확인
controller = metadata.controller
if controller:
print(f" 컨트롤러: 브로커 {controller.id}")
return True
except Exception as e:
print(f"❌ 클러스터 상태 확인 실패: {e}")
return False
def check_topic_health(self, topic_name):
"""토픽 상태 확인"""
try:
# 토픽 메타데이터 가져오기
metadata = self.admin_client.describe_topics([topic_name])
topic_metadata = metadata[topic_name]
print(f"📊 토픽 '{topic_name}' 상태:")
print(f" 파티션 수: {len(topic_metadata.partitions)}")
for partition in topic_metadata.partitions:
print(f" 파티션 {partition.id}:")
print(f" 리더: 브로커 {partition.leader}")
print(f" 복제본: {partition.replicas}")
print(f" ISR: {partition.isr}")
# Under-replicated 파티션 확인
if len(partition.isr) < len(partition.replicas):
print(f" ⚠️ Under-replicated 파티션 발견!")
return True
except Exception as e:
print(f"❌ 토픽 상태 확인 실패: {e}")
return False
def rebalance_partitions(self):
"""파티션 리밸런싱"""
print("⚖️ 파티션 리밸런싱 시작...")
try:
# 선호 복제본 선출 (Preferred Replica Election)
cmd = [
"kafka-preferred-replica-election",
"--bootstrap-server", self.bootstrap_servers[0]
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print("✅ 파티션 리밸런싱 완료")
else:
print(f"❌ 리밸런싱 실패: {result.stderr}")
except Exception as e:
print(f"❌ 리밸런싱 오류: {e}")
def monitor_consumer_lag(self, group_id):
"""컨슈머 랙 모니터링"""
try:
cmd = [
"kafka-consumer-groups",
"--bootstrap-server", self.bootstrap_servers[0],
"--describe",
"--group", group_id
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')[1:] # 헤더 제외
total_lag = 0
print(f"📈 컨슈머 그룹 '{group_id}' 랙 상태:")
for line in lines:
if line.strip():
parts = line.split()
if len(parts) >= 5:
topic = parts[0]
partition = parts[1]
current_offset = parts[2]
log_end_offset = parts[3]
lag = parts[4]
if lag.isdigit():
lag_num = int(lag)
total_lag += lag_num
if lag_num > 1000:
print(f" ⚠️ {topic}-{partition}: 랙 {lag_num}")
else:
print(f" ✅ {topic}-{partition}: 랙 {lag_num}")
print(f" 총 랙: {total_lag}")
return total_lag
except Exception as e:
print(f"❌ 컨슈머 랙 확인 실패: {e}")
return -1
# 사용 예시
if __name__ == "__main__":
manager = KafkaClusterManager(['localhost:9092', 'localhost:9093', 'localhost:9094'])
# 클러스터 상태 확인
manager.check_cluster_health()
# 토픽 상태 확인
manager.check_topic_health('user-events')
# 컨슈머 랙 모니터링
manager.monitor_consumer_lag('my-consumer-group')
자동 장애 복구
# auto_recovery.py
import time
import logging
from kafka_cluster_manager import KafkaClusterManager
class AutoRecoverySystem:
def __init__(self, cluster_manager):
self.cluster_manager = cluster_manager
self.logger = logging.getLogger(__name__)
self.alert_thresholds = {
'consumer_lag': 10000,
'under_replicated_partitions': 1,
'offline_brokers': 1
}
def monitor_and_recover(self, check_interval=60):
"""자동 모니터링 및 복구"""
self.logger.info("🤖 자동 복구 시스템 시작")
while True:
try:
# 1. 클러스터 기본 상태 확인
if not self.cluster_manager.check_cluster_health():
self.logger.error("❌ 클러스터 상태 불량 - 관리자 알림 필요")
self.send_alert("클러스터 상태 불량")
# 2. Under-replicated 파티션 확인 및 복구
self.check_and_fix_under_replicated()
# 3. 컨슈머 랙 확인
self.check_consumer_lag()
# 4. 브로커 성능 확인
self.check_broker_performance()
time.sleep(check_interval)
except Exception as e:
self.logger.error(f"모니터링 오류: {e}")
time.sleep(check_interval)
def check_and_fix_under_replicated(self):
"""Under-replicated 파티션 확인 및 수정"""
try:
# Under-replicated 파티션 감지 로직
# 실제로는 JMX 메트릭을 사용하거나 관리 API 호출
# 파티션 리밸런싱 수행
self.cluster_manager.rebalance_partitions()
except Exception as e:
self.logger.error(f"파티션 복구 실패: {e}")
def check_consumer_lag(self):
"""컨슈머 랙 확인"""
important_groups = ['order-processor', 'payment-processor', 'notification-service']
for group in important_groups:
lag = self.cluster_manager.monitor_consumer_lag(group)
if lag > self.alert_thresholds['consumer_lag']:
self.logger.warning(f"⚠️ 높은 컨슈머 랙: {group} = {lag}")
self.send_alert(f"컨슈머 랙 임계값 초과: {group}")
# 자동 스케일링 트리거 (예: Kubernetes HPA)
self.trigger_consumer_scaling(group)
def trigger_consumer_scaling(self, group_id):
"""컨슈머 자동 스케일링"""
self.logger.info(f"🔄 컨슈머 스케일링 시작: {group_id}")
# Kubernetes HPA 트리거 또는 Docker Compose 스케일링
try:
import subprocess
result = subprocess.run([
"kubectl", "scale", "deployment", f"{group_id}-consumer",
"--replicas=5"
], capture_output=True)
if result.returncode == 0:
self.logger.info(f"✅ 스케일링 완료: {group_id}")
else:
self.logger.error(f"❌ 스케일링 실패: {result.stderr}")
except Exception as e:
self.logger.error(f"스케일링 오류: {e}")
def send_alert(self, message):
"""알림 전송"""
# Slack, 이메일, PagerDuty 등으로 알림
self.logger.critical(f"🚨 알림: {message}")
# 실제 구현에서는 알림 시스템 연동
# slack_client.send_message(f"Kafka 알림: {message}")
모니터링과 성능 최적화
Prometheus + Grafana 모니터링
# monitoring-stack.yml
version: '3.8'
services:
kafka-exporter:
image: danielqsj/kafka-exporter:latest
command:
- '--kafka.server=kafka1:29092'
- '--kafka.server=kafka2:29092'
- '--kafka.server=kafka3:29092'
ports:
- "9308:9308"
depends_on:
- kafka1
- kafka2
- kafka3
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- ./kafka-alerts.yml:/etc/prometheus/kafka-alerts.yml
- prometheus-data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
- '--web.enable-lifecycle'
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin123
volumes:
- grafana-data:/var/lib/grafana
- ./grafana-dashboards:/etc/grafana/provisioning/dashboards
- ./grafana-datasources:/etc/grafana/provisioning/datasources
volumes:
prometheus-data:
grafana-data:
prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "kafka-alerts.yml"
scrape_configs:
- job_name: 'kafka-exporter'
static_configs:
- targets: ['kafka-exporter:9308']
- job_name: 'kafka-jmx'
static_configs:
- targets: ['kafka1:19092', 'kafka2:19093', 'kafka3:19094']
metrics_path: /metrics
scrape_interval: 30s
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']
핵심 성능 지표 모니터링
# kafka_metrics_collector.py
import time
import json
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient
import psutil
import requests
class KafkaMetricsCollector:
def __init__(self, bootstrap_servers, jmx_ports):
self.bootstrap_servers = bootstrap_servers
self.jmx_ports = jmx_ports
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def collect_broker_metrics(self):
"""브로커 성능 지표 수집"""
metrics = {}
for i, port in enumerate(self.jmx_ports):
broker_id = i + 1
try:
# JMX 메트릭 수집 (실제로는 JMX 클라이언트 사용)
broker_metrics = self.get_jmx_metrics(port)
metrics[f'broker_{broker_id}'] = {
'timestamp': int(time.time() * 1000),
'broker_id': broker_id,
'cpu_usage': psutil.cpu_percent(),
'memory_usage': psutil.virtual_memory().percent,
'disk_usage': psutil.disk_usage('/').percent,
'network_in': broker_metrics.get('network_in_rate', 0),
'network_out': broker_metrics.get('network_out_rate', 0),
'messages_in_per_sec': broker_metrics.get('messages_in_per_sec', 0),
'bytes_in_per_sec': broker_metrics.get('bytes_in_per_sec', 0),
'bytes_out_per_sec': broker_metrics.get('bytes_out_per_sec', 0),
'request_queue_size': broker_metrics.get('request_queue_size', 0),
'response_queue_size': broker_metrics.get('response_queue_size', 0)
}
except Exception as e:
print(f"❌ 브로커 {broker_id} 메트릭 수집 실패: {e}")
return metrics
def get_jmx_metrics(self, jmx_port):
"""JMX 메트릭 수집 (시뮬레이션)"""
# 실제로는 JMX 클라이언트를 사용하여 메트릭 수집
# 여기서는 시뮬레이션 데이터 반환
import random
return {
'network_in_rate': random.randint(1000, 10000),
'network_out_rate': random.randint(1000, 10000),
'messages_in_per_sec': random.randint(100, 1000),
'bytes_in_per_sec': random.randint(10000, 100000),
'bytes_out_per_sec': random.randint(10000, 100000),
'request_queue_size': random.randint(0, 100),
'response_queue_size': random.randint(0, 100)
}
def collect_topic_metrics(self, topics):
"""토픽별 지표 수집"""
metrics = {}
for topic in topics:
try:
# 토픽 메트릭 수집
topic_metrics = {
'timestamp': int(time.time() * 1000),
'topic': topic,
'partition_count': self.get_partition_count(topic),
'total_size_bytes': self.get_topic_size(topic),
'message_count': self.get_message_count(topic),
'consumer_groups': self.get_consumer_group_count(topic)
}
metrics[topic] = topic_metrics
except Exception as e:
print(f"❌ 토픽 {topic} 메트릭 수집 실패: {e}")
return metrics
def get_partition_count(self, topic):
"""토픽의 파티션 수 조회"""
try:
admin_client = KafkaAdminClient(bootstrap_servers=self.bootstrap_servers)
metadata = admin_client.describe_topics([topic])
return len(metadata[topic].partitions)
except:
return 0
def get_topic_size(self, topic):
"""토픽 크기 조회 (시뮬레이션)"""
# 실제로는 JMX 또는 관리 API 사용
import random
return random.randint(1000000, 10000000) # bytes
def get_message_count(self, topic):
"""토픽 메시지 수 조회 (시뮬레이션)"""
import random
return random.randint(1000, 100000)
def get_consumer_group_count(self, topic):
"""토픽을 구독하는 컨슈머 그룹 수"""
# 실제로는 관리 API로 조회
import random
return random.randint(1, 5)
def publish_metrics(self, metrics):
"""메트릭을 Kafka 토픽으로 전송"""
try:
self.producer.send('kafka-metrics', value=metrics)
self.producer.flush()
print(f"📊 메트릭 전송 완료: {len(metrics)}개 지표")
except Exception as e:
print(f"❌ 메트릭 전송 실패: {e}")
def start_collection(self, interval=30):
"""주기적 메트릭 수집 시작"""
print(f"📈 메트릭 수집 시작 (간격: {interval}초)")
topics_to_monitor = ['user-events', 'order-events', 'payment-events']
while True:
try:
# 브로커 메트릭 수집
broker_metrics = self.collect_broker_metrics()
# 토픽 메트릭 수집
topic_metrics = self.collect_topic_metrics(topics_to_monitor)
# 통합 메트릭 구성
all_metrics = {
'timestamp': int(time.time() * 1000),
'brokers': broker_metrics,
'topics': topic_metrics,
'cluster_health': self.check_cluster_health()
}
# 메트릭 전송
self.publish_metrics(all_metrics)
time.sleep(interval)
except Exception as e:
print(f"❌ 메트릭 수집 오류: {e}")
time.sleep(interval)
def check_cluster_health(self):
"""클러스터 헬스체크"""
try:
admin_client = KafkaAdminClient(bootstrap_servers=self.bootstrap_servers)
metadata = admin_client.describe_cluster()
return {
'broker_count': len(metadata.brokers),
'controller_id': metadata.controller.id if metadata.controller else -1,
'cluster_id': metadata.cluster_id,
'status': 'healthy'
}
except Exception as e:
return {
'status': 'unhealthy',
'error': str(e)
}
# 사용 예시
if __name__ == "__main__":
collector = KafkaMetricsCollector(
bootstrap_servers=['localhost:9092', 'localhost:9093', 'localhost:9094'],
jmx_ports=[19092, 19093, 19094]
)
collector.start_collection(interval=30)
보안 설정
SSL/TLS 암호화
# ssl-kafka.yml (보안 클러스터)
version: '3.8'
services:
kafka-ssl:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka-ssl
ports:
- "9093:9093"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
# SSL 설정
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: SSL:SSL,PLAINTEXT:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: SSL://localhost:9093,PLAINTEXT://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: SSL
# SSL 인증서 설정
KAFKA_SSL_KEYSTORE_FILENAME: kafka.server.keystore.jks
KAFKA_SSL_KEYSTORE_CREDENTIALS: kafka_keystore_creds
KAFKA_SSL_KEY_CREDENTIALS: kafka_ssl_key_creds
KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.server.truststore.jks
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: kafka_truststore_creds
# 클라이언트 인증 요구
KAFKA_SSL_CLIENT_AUTH: required
# 기타 보안 설정
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
volumes:
- ./ssl-certs:/etc/kafka/secrets
SASL 인증 설정
# sasl_client.py - SASL 인증 클라이언트
from kafka import KafkaProducer, KafkaConsumer
import ssl
# SASL/SCRAM 설정으로 Producer 생성
def create_secure_producer():
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
producer = KafkaProducer(
bootstrap_servers=['localhost:9093'],
# SASL 설정
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-256',
sasl_plain_username='kafka-user',
sasl_plain_password='kafka-password',
# SSL 설정
ssl_context=context,
# 직렬화
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
return producer
# SASL/SCRAM 설정으로 Consumer 생성
def create_secure_consumer(topics, group_id):
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
consumer = KafkaConsumer(
*topics,
bootstrap_servers=['localhost:9093'],
# SASL 설정
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-256',
sasl_plain_username='kafka-user',
sasl_plain_password='kafka-password',
# SSL 설정
ssl_context=context,
# Consumer 설정
group_id=group_id,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest'
)
return consumer
# 사용 예시
if __name__ == "__main__":
# 보안 Producer로 메시지 전송
producer = create_secure_producer()
message = {
'user_id': 'secure_user_001',
'action': 'secure_login',
'timestamp': int(time.time() * 1000)
}
producer.send('secure-events', value=message)
producer.close()
print("✅ 보안 메시지 전송 완료")
ACL (Access Control Lists) 설정
# ACL 관리 스크립트
#!/bin/bash
# 사용자별 권한 설정
setup_acls() {
KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_server_jaas.conf"
# 1. 관리자 권한 (모든 리소스 접근)
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
--add --allow-principal User:admin \
--operation All --topic '*' --cluster
# 2. 프로듀서 권한 (특정 토픽 쓰기)
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
--add --allow-principal User:producer-service \
--operation Write --topic user-events
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
--add --allow-principal User:producer-service \
--operation Describe --topic user-events
# 3. 컨슈머 권한 (특정 토픽 읽기)
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
--add --allow-principal User:consumer-service \
--operation Read --topic user-events
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
--add --allow-principal User:consumer-service \
--operation Read --group analytics-group
# 4. 모니터링 권한 (읽기 전용)
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
--add --allow-principal User:monitoring \
--operation Describe --topic '*'
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
--add --allow-principal User:monitoring \
--operation Describe --group '*'
}
# ACL 목록 확인
list_acls() {
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --list
}
# 특정 사용자 권한 확인
check_user_permissions() {
local username=$1
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
--list --principal User:$username
}
# 실행
setup_acls
list_acls
운영 자동화
백업 및 복원 자동화
# kafka_backup.py
import os
import subprocess
import json
import time
from datetime import datetime
import boto3 # AWS S3 업로드용
class KafkaBackupManager:
def __init__(self, kafka_data_dir, backup_dir, s3_bucket=None):
self.kafka_data_dir = kafka_data_dir
self.backup_dir = backup_dir
self.s3_bucket = s3_bucket
self.s3_client = boto3.client('s3') if s3_bucket else None
def create_backup(self, topics=None):
"""토픽 데이터 백업"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = os.path.join(self.backup_dir, f'kafka_backup_{timestamp}')
try:
os.makedirs(backup_path, exist_ok=True)
# 1. 토픽 메타데이터 백업
self.backup_topic_metadata(backup_path)
# 2. 컨슈머 그룹 오프셋 백업
self.backup_consumer_offsets(backup_path)
# 3. 토픽 데이터 백업 (선택적)
if topics:
self.backup_topic_data(topics, backup_path)
# 4. 설정 파일 백업
self.backup_configurations(backup_path)
# 5. 백업 압축
archive_path = self.compress_backup(backup_path)
# 6. S3 업로드 (선택적)
if self.s3_client:
self.upload_to_s3(archive_path)
print(f"✅ 백업 완료: {archive_path}")
return archive_path
except Exception as e:
print(f"❌ 백업 실패: {e}")
return None
def backup_topic_metadata(self, backup_path):
"""토픽 메타데이터 백업"""
try:
# 토픽 목록 및 설정 저장
topics_cmd = [
"kafka-topics", "--list",
"--bootstrap-server", "localhost:9092"
]
result = subprocess.run(topics_cmd, capture_output=True, text=True)
topics = result.stdout.strip().split('\n')
metadata = {}
for topic in topics:
if topic.strip():
# 토픽 상세 정보
describe_cmd = [
"kafka-topics", "--describe",
"--topic", topic,
"--bootstrap-server", "localhost:9092"
]
describe_result = subprocess.run(describe_cmd, capture_output=True, text=True)
metadata[topic] = {
'description': describe_result.stdout,
'timestamp': datetime.now().isoformat()
}
# 메타데이터 파일 저장
metadata_file = os.path.join(backup_path, 'topics_metadata.json')
with open(metadata_file, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
print(f"📋 토픽 메타데이터 백업: {len(topics)}개 토픽")
except Exception as e:
print(f"❌ 메타데이터 백업 실패: {e}")
def backup_consumer_offsets(self, backup_path):
"""컨슈머 그룹 오프셋 백업"""
try:
# 컨슈머 그룹 목록 조회
groups_cmd = [
"kafka-consumer-groups", "--list",
"--bootstrap-server", "localhost:9092"
]
result = subprocess.run(groups_cmd, capture_output=True, text=True)
groups = result.stdout.strip().split('\n')
offsets = {}
for group in groups:
if group.strip():
# 그룹별 오프셋 정보
describe_cmd = [
"kafka-consumer-groups", "--describe",
"--group", group,
"--bootstrap-server", "localhost:9092"
]
describe_result = subprocess.run(describe_cmd, capture_output=True, text=True)
offsets[group] = {
'offsets': describe_result.stdout,
'timestamp': datetime.now().isoformat()
}
# 오프셋 파일 저장
offsets_file = os.path.join(backup_path, 'consumer_offsets.json')
with open(offsets_file, 'w', encoding='utf-8') as f:
json.dump(offsets, f, indent=2, ensure_ascii=False)
print(f"📊 컨슈머 오프셋 백업: {len(groups)}개 그룹")
except Exception as e:
print(f"❌ 오프셋 백업 실패: {e}")
def compress_backup(self, backup_path):
"""백업 디렉토리 압축"""
try:
archive_path = f"{backup_path}.tar.gz"
compress_cmd = [
"tar", "-czf", archive_path,
"-C", os.path.dirname(backup_path),
os.path.basename(backup_path)
]
subprocess.run(compress_cmd, check=True)
# 원본 디렉토리 삭제
subprocess.run(["rm", "-rf", backup_path])
return archive_path
except Exception as e:
print(f"❌ 압축 실패: {e}")
return backup_path
def restore_from_backup(self, backup_file):
"""백업에서 복원"""
try:
# 백업 파일 압축 해제
extract_cmd = [
"tar", "-xzf", backup_file,
"-C", self.backup_dir
]
subprocess.run(extract_cmd, check=True)
# 백업 디렉토리 찾기
backup_name = os.path.basename(backup_file).replace('.tar.gz', '')
extracted_path = os.path.join(self.backup_dir, backup_name)
# 토픽 복원
self.restore_topics(extracted_path)
# 컨슈머 오프셋 복원
self.restore_consumer_offsets(extracted_path)
print(f"✅ 복원 완료: {backup_file}")
except Exception as e:
print(f"❌ 복원 실패: {e}")
def schedule_daily_backup(self):
"""일일 백업 스케줄링"""
import schedule
def daily_backup():
important_topics = ['user-events', 'order-events', 'payment-events']
self.create_backup(topics=important_topics)
# 매일 새벽 2시에 백업
schedule.every().day.at("02:00").do(daily_backup)
print("📅 일일 백업 스케줄 등록 (매일 02:00)")
while True:
schedule.run_pending()
time.sleep(60)
# 사용 예시
if __name__ == "__main__":
backup_manager = KafkaBackupManager(
kafka_data_dir='/kafka-data',
backup_dir='/backups',
s3_bucket='my-kafka-backups'
)
# 백업 생성
backup_manager.create_backup(['user-events', 'order-events'])
# 스케줄링 시작
# backup_manager.schedule_daily_backup()
장애 대응과 복구
장애 시나리오별 대응
# disaster_recovery.py
import time
import subprocess
from kafka_cluster_manager import KafkaClusterManager
class DisasterRecoveryManager:
def __init__(self, cluster_manager):
self.cluster_manager = cluster_manager
self.recovery_procedures = {
'broker_failure': self.handle_broker_failure,
'network_partition': self.handle_network_partition,
'data_corruption': self.handle_data_corruption,
'zookeeper_failure': self.handle_zookeeper_failure
}
def detect_and_respond(self):
"""장애 감지 및 자동 대응"""
while True:
try:
# 1. 브로커 상태 확인
broker_status = self.check_broker_health()
# 2. 네트워크 파티션 확인
network_status = self.check_network_partition()
# 3. 데이터 무결성 확인
data_status = self.check_data_integrity()
# 4. ZooKeeper 상태 확인
zk_status = self.check_zookeeper_health()
# 장애 대응
if not broker_status['healthy']:
self.handle_broker_failure(broker_status)
if not network_status['healthy']:
self.handle_network_partition(network_status)
if not data_status['healthy']:
self.handle_data_corruption(data_status)
if not zk_status['healthy']:
self.handle_zookeeper_failure(zk_status)
time.sleep(30) # 30초마다 체크
except Exception as e:
print(f"❌ 장애 감지 오류: {e}")
time.sleep(30)
def handle_broker_failure(self, status):
"""브로커 장애 처리"""
failed_brokers = status.get('failed_brokers', [])
for broker_id in failed_brokers:
print(f"🚨 브로커 {broker_id} 장애 감지")
# 1. 브로커 재시작 시도
if self.restart_broker(broker_id):
print(f"✅ 브로커 {broker_id} 재시작 성공")
continue
# 2. 파티션 리더 재선출
self.trigger_leader_election()
# 3. 복제본 재분산
self.redistribute_replicas(broker_id)
# 4. 알림 전송
self.send_alert(f"브로커 {broker_id} 장애 - 수동 복구 필요")
def restart_broker(self, broker_id):
"""브로커 재시작"""
try:
# Docker 환경에서 컨테이너 재시작
restart_cmd = [
"docker", "restart", f"kafka{broker_id}"
]
result = subprocess.run(restart_cmd, capture_output=True)
if result.returncode == 0:
# 재시작 후 상태 확인
time.sleep(10)
return self.verify_broker_health(broker_id)
else:
print(f"❌ 브로커 {broker_id} 재시작 실패")
return False
except Exception as e:
print(f"❌ 브로커 재시작 오류: {e}")
return False
def handle_network_partition(self, status):
"""네트워크 파티션 처리"""
print("🚨 네트워크 파티션 감지")
# 1. 클러스터 상태 확인
cluster_info = self.analyze_cluster_split()
# 2. 쿼럼 확인
if cluster_info['has_quorum']:
print("✅ 쿼럼 유지 - 서비스 계속")
else:
print("⚠️ 쿼럼 손실 - 읽기 전용 모드")
self.enable_readonly_mode()
# 3. 자동 복구 시도
self.attempt_network_recovery()
def handle_data_corruption(self, status):
"""데이터 손상 처리"""
corrupted_partitions = status.get('corrupted_partitions', [])
for partition_info in corrupted_partitions:
topic = partition_info['topic']
partition = partition_info['partition']
print(f"🚨 데이터 손상 감지: {topic}-{partition}")
# 1. 파티션 복구 시도
self.recover_partition(topic, partition)
# 2. 백업에서 복원
self.restore_from_backup(topic, partition)
# 3. 복제본 재동기화
self.resync_replicas(topic, partition)
def create_runbook(self):
"""운영 가이드북 생성"""
runbook = {
"kafka_disaster_recovery": {
"broker_failure": {
"detection": [
"JMX 메트릭 확인",
"헬스체크 API 호출",
"로그 파일 분석"
],
"immediate_actions": [
"다른 브로커들 상태 확인",
"파티션 리더 재선출",
"클라이언트 재연결 확인"
],
"recovery_steps": [
"브로커 재시작",
"데이터 무결성 확인",
"복제본 재동기화",
"성능 모니터링"
]
},
"complete_cluster_failure": {
"detection": [
"모든 브로커 응답 없음",
"ZooKeeper 연결 실패",
"클라이언트 전체 오류"
],
"recovery_steps": [
"ZooKeeper 클러스터 복구",
"브로커들 순차적 재시작",
"토픽/파티션 메타데이터 복구",
"컨슈머 오프셋 복구",
"애플리케이션 재연결"
]
}
}
}
# 운영 가이드 파일 저장
import json
with open('kafka_disaster_recovery_runbook.json', 'w') as f:
json.dump(runbook, f, indent=2, ensure_ascii=False)
print("📖 장애 복구 가이드북 생성 완료")
# 사용 예시
if __name__ == "__main__":
cluster_manager = KafkaClusterManager(['localhost:9092', 'localhost:9093', 'localhost:9094'])
dr_manager = DisasterRecoveryManager(cluster_manager)
# 운영 가이드북 생성
dr_manager.create_runbook()
# 장애 감지 및 자동 복구 시작
# dr_manager.detect_and_respond()
이제 Kafka를 프로덕션 환경에서 안정적으로 운영할 수 있는 모든 지식을 갖추었습니다.
핵심 포인트:
- 클러스터링: 최소 3노드 구성으로 고가용성 확보
- 모니터링: Prometheus/Grafana 기반 종합 모니터링 시스템
- 보안: SSL/TLS, SASL, ACL을 통한 다계층 보안
- 자동화: 백업, 복구, 장애 대응 자동화
- 운영 가이드: 체계적인 장애 대응 절차 수립
Kafka 3편 완주를 축하합니다! 🎉
실제 프로덕션 환경에서는 비즈니스 요구사항과 트래픽 패턴에 맞는 세밀한 튜닝이 필요합니다. 단계적으로 적용하며 충분한 테스트를 거친 후 배포하세요.
Kafka는 강력한 도구이지만, 올바른 운영 없이는 그 잠재력을 발휘할 수 없습니다. 지속적인 모니터링과 최적화를 통해 안정적인 데이터 스트리밍 플랫폼을 구축하세요.
'Data Platform' 카테고리의 다른 글
| RabbitMQ 완벽 가이드 3편: 프로덕션 운영과 고급 기능 (3) | 2025.07.23 |
|---|---|
| RabbitMQ 완벽 가이드 2편: Exchange와 라우팅 패턴 (0) | 2025.07.23 |
| RabbitMQ 완벽 가이드 1편: 기초 개념과 설치 (0) | 2025.07.23 |
| Kafka Producer/Consumer 고급 가이드 - 파티셔닝, 스트림 처리, 성능 최적화 (3) | 2025.07.23 |
| Apache Kafka 기초 가이드 - 분산 스트리밍 플랫폼 개념과 설치 (2) | 2025.07.23 |