hobokai 님의 블로그

Kafka 프로덕션 가이드 - 클러스터링, 모니터링, 보안, 운영 자동화 본문

Data Platform

Kafka 프로덕션 가이드 - 클러스터링, 모니터링, 보안, 운영 자동화

hobokai 2025. 7. 23. 13:24

Apache Kafka 완벽 가이드 3편: 클러스터링과 프로덕션 운영

목차

  1. Kafka 클러스터 구성
  2. 모니터링과 성능 최적화
  3. 보안 설정
  4. 운영 자동화
  5. 장애 대응과 복구

Kafka 클러스터 구성

프로덕션 환경에서는 고가용성과 확장성을 위해 Kafka 클러스터를 구성해야 합니다.

3노드 클러스터 구성

# docker-compose-cluster.yml
version: '3.8'
services:
  zookeeper1:
    image: confluentinc/cp-zookeeper:7.4.0
    hostname: zookeeper1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper1:2888:3888;zookeeper2:2888:3888;zookeeper3:2888:3888
    volumes:
      - zk1-data:/var/lib/zookeeper/data
      - zk1-logs:/var/lib/zookeeper/log

  zookeeper2:
    image: confluentinc/cp-zookeeper:7.4.0
    hostname: zookeeper2
    ports:
      - "2182:2181"
    environment:
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper1:2888:3888;zookeeper2:2888:3888;zookeeper3:2888:3888
    volumes:
      - zk2-data:/var/lib/zookeeper/data
      - zk2-logs:/var/lib/zookeeper/log

  zookeeper3:
    image: confluentinc/cp-zookeeper:7.4.0
    hostname: zookeeper3
    ports:
      - "2183:2181"
    environment:
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: zookeeper1:2888:3888;zookeeper2:2888:3888;zookeeper3:2888:3888
    volumes:
      - zk3-data:/var/lib/zookeeper/data
      - zk3-logs:/var/lib/zookeeper/log

  kafka1:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka1
    ports:
      - "9092:9092"
      - "19092:19092"
    depends_on:
      - zookeeper1
      - zookeeper2
      - zookeeper3
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 3000
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_JMX_PORT: 19092
      KAFKA_JMX_HOSTNAME: localhost
    volumes:
      - kafka1-data:/var/lib/kafka/data

  kafka2:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka2
    ports:
      - "9093:9092"
      - "19093:19092"
    depends_on:
      - zookeeper1
      - zookeeper2
      - zookeeper3
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29092,PLAINTEXT_HOST://localhost:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 3000
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_JMX_PORT: 19092
      KAFKA_JMX_HOSTNAME: localhost
    volumes:
      - kafka2-data:/var/lib/kafka/data

  kafka3:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka3
    ports:
      - "9094:9092"
      - "19094:19092"
    depends_on:
      - zookeeper1
      - zookeeper2
      - zookeeper3
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:29092,PLAINTEXT_HOST://localhost:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 3000
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
      KAFKA_JMX_PORT: 19092
      KAFKA_JMX_HOSTNAME: localhost
    volumes:
      - kafka3-data:/var/lib/kafka/data

volumes:
  zk1-data:
  zk1-logs:
  zk2-data:
  zk2-logs:
  zk3-data:
  zk3-logs:
  kafka1-data:
  kafka2-data:
  kafka3-data:

클러스터 관리 스크립트

# cluster_manager.py
import subprocess
import json
import time
from kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType
from kafka import KafkaProducer, KafkaConsumer

class KafkaClusterManager:
    def __init__(self, bootstrap_servers):
        self.bootstrap_servers = bootstrap_servers
        self.admin_client = KafkaAdminClient(
            bootstrap_servers=bootstrap_servers,
            client_id='cluster_manager'
        )

    def check_cluster_health(self):
        """클러스터 상태 확인"""
        try:
            # 브로커 메타데이터 확인
            metadata = self.admin_client.describe_cluster()
            brokers = metadata.brokers

            print("🔍 클러스터 상태 확인:")
            print(f"  브로커 수: {len(brokers)}")

            for broker in brokers:
                print(f"  브로커 {broker.id}: {broker.host}:{broker.port}")

            # 컨트롤러 확인
            controller = metadata.controller
            if controller:
                print(f"  컨트롤러: 브로커 {controller.id}")

            return True

        except Exception as e:
            print(f"❌ 클러스터 상태 확인 실패: {e}")
            return False

    def check_topic_health(self, topic_name):
        """토픽 상태 확인"""
        try:
            # 토픽 메타데이터 가져오기
            metadata = self.admin_client.describe_topics([topic_name])
            topic_metadata = metadata[topic_name]

            print(f"📊 토픽 '{topic_name}' 상태:")
            print(f"  파티션 수: {len(topic_metadata.partitions)}")

            for partition in topic_metadata.partitions:
                print(f"  파티션 {partition.id}:")
                print(f"    리더: 브로커 {partition.leader}")
                print(f"    복제본: {partition.replicas}")
                print(f"    ISR: {partition.isr}")

                # Under-replicated 파티션 확인
                if len(partition.isr) < len(partition.replicas):
                    print(f"    ⚠️ Under-replicated 파티션 발견!")

            return True

        except Exception as e:
            print(f"❌ 토픽 상태 확인 실패: {e}")
            return False

    def rebalance_partitions(self):
        """파티션 리밸런싱"""
        print("⚖️ 파티션 리밸런싱 시작...")

        try:
            # 선호 복제본 선출 (Preferred Replica Election)
            cmd = [
                "kafka-preferred-replica-election",
                "--bootstrap-server", self.bootstrap_servers[0]
            ]

            result = subprocess.run(cmd, capture_output=True, text=True)

            if result.returncode == 0:
                print("✅ 파티션 리밸런싱 완료")
            else:
                print(f"❌ 리밸런싱 실패: {result.stderr}")

        except Exception as e:
            print(f"❌ 리밸런싱 오류: {e}")

    def monitor_consumer_lag(self, group_id):
        """컨슈머 랙 모니터링"""
        try:
            cmd = [
                "kafka-consumer-groups",
                "--bootstrap-server", self.bootstrap_servers[0],
                "--describe",
                "--group", group_id
            ]

            result = subprocess.run(cmd, capture_output=True, text=True)

            if result.returncode == 0:
                lines = result.stdout.strip().split('\n')[1:]  # 헤더 제외
                total_lag = 0

                print(f"📈 컨슈머 그룹 '{group_id}' 랙 상태:")

                for line in lines:
                    if line.strip():
                        parts = line.split()
                        if len(parts) >= 5:
                            topic = parts[0]
                            partition = parts[1]
                            current_offset = parts[2]
                            log_end_offset = parts[3]
                            lag = parts[4]

                            if lag.isdigit():
                                lag_num = int(lag)
                                total_lag += lag_num

                                if lag_num > 1000:
                                    print(f"  ⚠️ {topic}-{partition}: 랙 {lag_num}")
                                else:
                                    print(f"  ✅ {topic}-{partition}: 랙 {lag_num}")

                print(f"  총 랙: {total_lag}")
                return total_lag

        except Exception as e:
            print(f"❌ 컨슈머 랙 확인 실패: {e}")
            return -1

# 사용 예시
if __name__ == "__main__":
    manager = KafkaClusterManager(['localhost:9092', 'localhost:9093', 'localhost:9094'])

    # 클러스터 상태 확인
    manager.check_cluster_health()

    # 토픽 상태 확인
    manager.check_topic_health('user-events')

    # 컨슈머 랙 모니터링
    manager.monitor_consumer_lag('my-consumer-group')

자동 장애 복구

# auto_recovery.py
import time
import logging
from kafka_cluster_manager import KafkaClusterManager

class AutoRecoverySystem:
    def __init__(self, cluster_manager):
        self.cluster_manager = cluster_manager
        self.logger = logging.getLogger(__name__)
        self.alert_thresholds = {
            'consumer_lag': 10000,
            'under_replicated_partitions': 1,
            'offline_brokers': 1
        }

    def monitor_and_recover(self, check_interval=60):
        """자동 모니터링 및 복구"""
        self.logger.info("🤖 자동 복구 시스템 시작")

        while True:
            try:
                # 1. 클러스터 기본 상태 확인
                if not self.cluster_manager.check_cluster_health():
                    self.logger.error("❌ 클러스터 상태 불량 - 관리자 알림 필요")
                    self.send_alert("클러스터 상태 불량")

                # 2. Under-replicated 파티션 확인 및 복구
                self.check_and_fix_under_replicated()

                # 3. 컨슈머 랙 확인
                self.check_consumer_lag()

                # 4. 브로커 성능 확인
                self.check_broker_performance()

                time.sleep(check_interval)

            except Exception as e:
                self.logger.error(f"모니터링 오류: {e}")
                time.sleep(check_interval)

    def check_and_fix_under_replicated(self):
        """Under-replicated 파티션 확인 및 수정"""
        try:
            # Under-replicated 파티션 감지 로직
            # 실제로는 JMX 메트릭을 사용하거나 관리 API 호출

            # 파티션 리밸런싱 수행
            self.cluster_manager.rebalance_partitions()

        except Exception as e:
            self.logger.error(f"파티션 복구 실패: {e}")

    def check_consumer_lag(self):
        """컨슈머 랙 확인"""
        important_groups = ['order-processor', 'payment-processor', 'notification-service']

        for group in important_groups:
            lag = self.cluster_manager.monitor_consumer_lag(group)

            if lag > self.alert_thresholds['consumer_lag']:
                self.logger.warning(f"⚠️ 높은 컨슈머 랙: {group} = {lag}")
                self.send_alert(f"컨슈머 랙 임계값 초과: {group}")

                # 자동 스케일링 트리거 (예: Kubernetes HPA)
                self.trigger_consumer_scaling(group)

    def trigger_consumer_scaling(self, group_id):
        """컨슈머 자동 스케일링"""
        self.logger.info(f"🔄 컨슈머 스케일링 시작: {group_id}")

        # Kubernetes HPA 트리거 또는 Docker Compose 스케일링
        try:
            import subprocess
            result = subprocess.run([
                "kubectl", "scale", "deployment", f"{group_id}-consumer",
                "--replicas=5"
            ], capture_output=True)

            if result.returncode == 0:
                self.logger.info(f"✅ 스케일링 완료: {group_id}")
            else:
                self.logger.error(f"❌ 스케일링 실패: {result.stderr}")

        except Exception as e:
            self.logger.error(f"스케일링 오류: {e}")

    def send_alert(self, message):
        """알림 전송"""
        # Slack, 이메일, PagerDuty 등으로 알림
        self.logger.critical(f"🚨 알림: {message}")

        # 실제 구현에서는 알림 시스템 연동
        # slack_client.send_message(f"Kafka 알림: {message}")

모니터링과 성능 최적화

Prometheus + Grafana 모니터링

# monitoring-stack.yml
version: '3.8'
services:
  kafka-exporter:
    image: danielqsj/kafka-exporter:latest
    command:
      - '--kafka.server=kafka1:29092'
      - '--kafka.server=kafka2:29092'
      - '--kafka.server=kafka3:29092'
    ports:
      - "9308:9308"
    depends_on:
      - kafka1
      - kafka2
      - kafka3

  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - ./kafka-alerts.yml:/etc/prometheus/kafka-alerts.yml
      - prometheus-data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--web.console.libraries=/etc/prometheus/console_libraries'
      - '--web.console.templates=/etc/prometheus/consoles'
      - '--web.enable-lifecycle'

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin123
    volumes:
      - grafana-data:/var/lib/grafana
      - ./grafana-dashboards:/etc/grafana/provisioning/dashboards
      - ./grafana-datasources:/etc/grafana/provisioning/datasources

volumes:
  prometheus-data:
  grafana-data:

prometheus.yml

global:
  scrape_interval: 15s
  evaluation_interval: 15s

rule_files:
  - "kafka-alerts.yml"

scrape_configs:
  - job_name: 'kafka-exporter'
    static_configs:
      - targets: ['kafka-exporter:9308']

  - job_name: 'kafka-jmx'
    static_configs:
      - targets: ['kafka1:19092', 'kafka2:19093', 'kafka3:19094']
    metrics_path: /metrics
    scrape_interval: 30s

alerting:
  alertmanagers:
    - static_configs:
        - targets: ['alertmanager:9093']

핵심 성능 지표 모니터링

# kafka_metrics_collector.py
import time
import json
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient
import psutil
import requests

class KafkaMetricsCollector:
    def __init__(self, bootstrap_servers, jmx_ports):
        self.bootstrap_servers = bootstrap_servers
        self.jmx_ports = jmx_ports
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )

    def collect_broker_metrics(self):
        """브로커 성능 지표 수집"""
        metrics = {}

        for i, port in enumerate(self.jmx_ports):
            broker_id = i + 1

            try:
                # JMX 메트릭 수집 (실제로는 JMX 클라이언트 사용)
                broker_metrics = self.get_jmx_metrics(port)

                metrics[f'broker_{broker_id}'] = {
                    'timestamp': int(time.time() * 1000),
                    'broker_id': broker_id,
                    'cpu_usage': psutil.cpu_percent(),
                    'memory_usage': psutil.virtual_memory().percent,
                    'disk_usage': psutil.disk_usage('/').percent,
                    'network_in': broker_metrics.get('network_in_rate', 0),
                    'network_out': broker_metrics.get('network_out_rate', 0),
                    'messages_in_per_sec': broker_metrics.get('messages_in_per_sec', 0),
                    'bytes_in_per_sec': broker_metrics.get('bytes_in_per_sec', 0),
                    'bytes_out_per_sec': broker_metrics.get('bytes_out_per_sec', 0),
                    'request_queue_size': broker_metrics.get('request_queue_size', 0),
                    'response_queue_size': broker_metrics.get('response_queue_size', 0)
                }

            except Exception as e:
                print(f"❌ 브로커 {broker_id} 메트릭 수집 실패: {e}")

        return metrics

    def get_jmx_metrics(self, jmx_port):
        """JMX 메트릭 수집 (시뮬레이션)"""
        # 실제로는 JMX 클라이언트를 사용하여 메트릭 수집
        # 여기서는 시뮬레이션 데이터 반환
        import random

        return {
            'network_in_rate': random.randint(1000, 10000),
            'network_out_rate': random.randint(1000, 10000),
            'messages_in_per_sec': random.randint(100, 1000),
            'bytes_in_per_sec': random.randint(10000, 100000),
            'bytes_out_per_sec': random.randint(10000, 100000),
            'request_queue_size': random.randint(0, 100),
            'response_queue_size': random.randint(0, 100)
        }

    def collect_topic_metrics(self, topics):
        """토픽별 지표 수집"""
        metrics = {}

        for topic in topics:
            try:
                # 토픽 메트릭 수집
                topic_metrics = {
                    'timestamp': int(time.time() * 1000),
                    'topic': topic,
                    'partition_count': self.get_partition_count(topic),
                    'total_size_bytes': self.get_topic_size(topic),
                    'message_count': self.get_message_count(topic),
                    'consumer_groups': self.get_consumer_group_count(topic)
                }

                metrics[topic] = topic_metrics

            except Exception as e:
                print(f"❌ 토픽 {topic} 메트릭 수집 실패: {e}")

        return metrics

    def get_partition_count(self, topic):
        """토픽의 파티션 수 조회"""
        try:
            admin_client = KafkaAdminClient(bootstrap_servers=self.bootstrap_servers)
            metadata = admin_client.describe_topics([topic])
            return len(metadata[topic].partitions)
        except:
            return 0

    def get_topic_size(self, topic):
        """토픽 크기 조회 (시뮬레이션)"""
        # 실제로는 JMX 또는 관리 API 사용
        import random
        return random.randint(1000000, 10000000)  # bytes

    def get_message_count(self, topic):
        """토픽 메시지 수 조회 (시뮬레이션)"""
        import random
        return random.randint(1000, 100000)

    def get_consumer_group_count(self, topic):
        """토픽을 구독하는 컨슈머 그룹 수"""
        # 실제로는 관리 API로 조회
        import random
        return random.randint(1, 5)

    def publish_metrics(self, metrics):
        """메트릭을 Kafka 토픽으로 전송"""
        try:
            self.producer.send('kafka-metrics', value=metrics)
            self.producer.flush()
            print(f"📊 메트릭 전송 완료: {len(metrics)}개 지표")
        except Exception as e:
            print(f"❌ 메트릭 전송 실패: {e}")

    def start_collection(self, interval=30):
        """주기적 메트릭 수집 시작"""
        print(f"📈 메트릭 수집 시작 (간격: {interval}초)")

        topics_to_monitor = ['user-events', 'order-events', 'payment-events']

        while True:
            try:
                # 브로커 메트릭 수집
                broker_metrics = self.collect_broker_metrics()

                # 토픽 메트릭 수집
                topic_metrics = self.collect_topic_metrics(topics_to_monitor)

                # 통합 메트릭 구성
                all_metrics = {
                    'timestamp': int(time.time() * 1000),
                    'brokers': broker_metrics,
                    'topics': topic_metrics,
                    'cluster_health': self.check_cluster_health()
                }

                # 메트릭 전송
                self.publish_metrics(all_metrics)

                time.sleep(interval)

            except Exception as e:
                print(f"❌ 메트릭 수집 오류: {e}")
                time.sleep(interval)

    def check_cluster_health(self):
        """클러스터 헬스체크"""
        try:
            admin_client = KafkaAdminClient(bootstrap_servers=self.bootstrap_servers)
            metadata = admin_client.describe_cluster()

            return {
                'broker_count': len(metadata.brokers),
                'controller_id': metadata.controller.id if metadata.controller else -1,
                'cluster_id': metadata.cluster_id,
                'status': 'healthy'
            }
        except Exception as e:
            return {
                'status': 'unhealthy',
                'error': str(e)
            }

# 사용 예시
if __name__ == "__main__":
    collector = KafkaMetricsCollector(
        bootstrap_servers=['localhost:9092', 'localhost:9093', 'localhost:9094'],
        jmx_ports=[19092, 19093, 19094]
    )
    collector.start_collection(interval=30)

보안 설정

SSL/TLS 암호화

# ssl-kafka.yml (보안 클러스터)
version: '3.8'
services:
  kafka-ssl:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka-ssl
    ports:
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'

      # SSL 설정
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: SSL:SSL,PLAINTEXT:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: SSL://localhost:9093,PLAINTEXT://localhost:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: SSL

      # SSL 인증서 설정
      KAFKA_SSL_KEYSTORE_FILENAME: kafka.server.keystore.jks
      KAFKA_SSL_KEYSTORE_CREDENTIALS: kafka_keystore_creds
      KAFKA_SSL_KEY_CREDENTIALS: kafka_ssl_key_creds
      KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.server.truststore.jks
      KAFKA_SSL_TRUSTSTORE_CREDENTIALS: kafka_truststore_creds

      # 클라이언트 인증 요구
      KAFKA_SSL_CLIENT_AUTH: required

      # 기타 보안 설정
      KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
      KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""

    volumes:
      - ./ssl-certs:/etc/kafka/secrets

SASL 인증 설정

# sasl_client.py - SASL 인증 클라이언트
from kafka import KafkaProducer, KafkaConsumer
import ssl

# SASL/SCRAM 설정으로 Producer 생성
def create_secure_producer():
    context = ssl.create_default_context()
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE

    producer = KafkaProducer(
        bootstrap_servers=['localhost:9093'],

        # SASL 설정
        security_protocol='SASL_SSL',
        sasl_mechanism='SCRAM-SHA-256',
        sasl_plain_username='kafka-user',
        sasl_plain_password='kafka-password',

        # SSL 설정
        ssl_context=context,

        # 직렬화
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )

    return producer

# SASL/SCRAM 설정으로 Consumer 생성
def create_secure_consumer(topics, group_id):
    context = ssl.create_default_context()
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE

    consumer = KafkaConsumer(
        *topics,
        bootstrap_servers=['localhost:9093'],

        # SASL 설정
        security_protocol='SASL_SSL',
        sasl_mechanism='SCRAM-SHA-256',
        sasl_plain_username='kafka-user',
        sasl_plain_password='kafka-password',

        # SSL 설정
        ssl_context=context,

        # Consumer 설정
        group_id=group_id,
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        auto_offset_reset='earliest'
    )

    return consumer

# 사용 예시
if __name__ == "__main__":
    # 보안 Producer로 메시지 전송
    producer = create_secure_producer()

    message = {
        'user_id': 'secure_user_001',
        'action': 'secure_login',
        'timestamp': int(time.time() * 1000)
    }

    producer.send('secure-events', value=message)
    producer.close()

    print("✅ 보안 메시지 전송 완료")

ACL (Access Control Lists) 설정

# ACL 관리 스크립트
#!/bin/bash

# 사용자별 권한 설정
setup_acls() {
    KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_server_jaas.conf"

    # 1. 관리자 권한 (모든 리소스 접근)
    kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
        --add --allow-principal User:admin \
        --operation All --topic '*' --cluster

    # 2. 프로듀서 권한 (특정 토픽 쓰기)
    kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
        --add --allow-principal User:producer-service \
        --operation Write --topic user-events

    kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
        --add --allow-principal User:producer-service \
        --operation Describe --topic user-events

    # 3. 컨슈머 권한 (특정 토픽 읽기)
    kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
        --add --allow-principal User:consumer-service \
        --operation Read --topic user-events

    kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
        --add --allow-principal User:consumer-service \
        --operation Read --group analytics-group

    # 4. 모니터링 권한 (읽기 전용)
    kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
        --add --allow-principal User:monitoring \
        --operation Describe --topic '*'

    kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
        --add --allow-principal User:monitoring \
        --operation Describe --group '*'
}

# ACL 목록 확인
list_acls() {
    kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --list
}

# 특정 사용자 권한 확인
check_user_permissions() {
    local username=$1
    kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
        --list --principal User:$username
}

# 실행
setup_acls
list_acls

운영 자동화

백업 및 복원 자동화

# kafka_backup.py
import os
import subprocess
import json
import time
from datetime import datetime
import boto3  # AWS S3 업로드용

class KafkaBackupManager:
    def __init__(self, kafka_data_dir, backup_dir, s3_bucket=None):
        self.kafka_data_dir = kafka_data_dir
        self.backup_dir = backup_dir
        self.s3_bucket = s3_bucket
        self.s3_client = boto3.client('s3') if s3_bucket else None

    def create_backup(self, topics=None):
        """토픽 데이터 백업"""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_path = os.path.join(self.backup_dir, f'kafka_backup_{timestamp}')

        try:
            os.makedirs(backup_path, exist_ok=True)

            # 1. 토픽 메타데이터 백업
            self.backup_topic_metadata(backup_path)

            # 2. 컨슈머 그룹 오프셋 백업
            self.backup_consumer_offsets(backup_path)

            # 3. 토픽 데이터 백업 (선택적)
            if topics:
                self.backup_topic_data(topics, backup_path)

            # 4. 설정 파일 백업
            self.backup_configurations(backup_path)

            # 5. 백업 압축
            archive_path = self.compress_backup(backup_path)

            # 6. S3 업로드 (선택적)
            if self.s3_client:
                self.upload_to_s3(archive_path)

            print(f"✅ 백업 완료: {archive_path}")
            return archive_path

        except Exception as e:
            print(f"❌ 백업 실패: {e}")
            return None

    def backup_topic_metadata(self, backup_path):
        """토픽 메타데이터 백업"""
        try:
            # 토픽 목록 및 설정 저장
            topics_cmd = [
                "kafka-topics", "--list",
                "--bootstrap-server", "localhost:9092"
            ]

            result = subprocess.run(topics_cmd, capture_output=True, text=True)
            topics = result.stdout.strip().split('\n')

            metadata = {}
            for topic in topics:
                if topic.strip():
                    # 토픽 상세 정보
                    describe_cmd = [
                        "kafka-topics", "--describe",
                        "--topic", topic,
                        "--bootstrap-server", "localhost:9092"
                    ]

                    describe_result = subprocess.run(describe_cmd, capture_output=True, text=True)
                    metadata[topic] = {
                        'description': describe_result.stdout,
                        'timestamp': datetime.now().isoformat()
                    }

            # 메타데이터 파일 저장
            metadata_file = os.path.join(backup_path, 'topics_metadata.json')
            with open(metadata_file, 'w', encoding='utf-8') as f:
                json.dump(metadata, f, indent=2, ensure_ascii=False)

            print(f"📋 토픽 메타데이터 백업: {len(topics)}개 토픽")

        except Exception as e:
            print(f"❌ 메타데이터 백업 실패: {e}")

    def backup_consumer_offsets(self, backup_path):
        """컨슈머 그룹 오프셋 백업"""
        try:
            # 컨슈머 그룹 목록 조회
            groups_cmd = [
                "kafka-consumer-groups", "--list",
                "--bootstrap-server", "localhost:9092"
            ]

            result = subprocess.run(groups_cmd, capture_output=True, text=True)
            groups = result.stdout.strip().split('\n')

            offsets = {}
            for group in groups:
                if group.strip():
                    # 그룹별 오프셋 정보
                    describe_cmd = [
                        "kafka-consumer-groups", "--describe",
                        "--group", group,
                        "--bootstrap-server", "localhost:9092"
                    ]

                    describe_result = subprocess.run(describe_cmd, capture_output=True, text=True)
                    offsets[group] = {
                        'offsets': describe_result.stdout,
                        'timestamp': datetime.now().isoformat()
                    }

            # 오프셋 파일 저장
            offsets_file = os.path.join(backup_path, 'consumer_offsets.json')
            with open(offsets_file, 'w', encoding='utf-8') as f:
                json.dump(offsets, f, indent=2, ensure_ascii=False)

            print(f"📊 컨슈머 오프셋 백업: {len(groups)}개 그룹")

        except Exception as e:
            print(f"❌ 오프셋 백업 실패: {e}")

    def compress_backup(self, backup_path):
        """백업 디렉토리 압축"""
        try:
            archive_path = f"{backup_path}.tar.gz"

            compress_cmd = [
                "tar", "-czf", archive_path,
                "-C", os.path.dirname(backup_path),
                os.path.basename(backup_path)
            ]

            subprocess.run(compress_cmd, check=True)

            # 원본 디렉토리 삭제
            subprocess.run(["rm", "-rf", backup_path])

            return archive_path

        except Exception as e:
            print(f"❌ 압축 실패: {e}")
            return backup_path

    def restore_from_backup(self, backup_file):
        """백업에서 복원"""
        try:
            # 백업 파일 압축 해제
            extract_cmd = [
                "tar", "-xzf", backup_file,
                "-C", self.backup_dir
            ]

            subprocess.run(extract_cmd, check=True)

            # 백업 디렉토리 찾기
            backup_name = os.path.basename(backup_file).replace('.tar.gz', '')
            extracted_path = os.path.join(self.backup_dir, backup_name)

            # 토픽 복원
            self.restore_topics(extracted_path)

            # 컨슈머 오프셋 복원
            self.restore_consumer_offsets(extracted_path)

            print(f"✅ 복원 완료: {backup_file}")

        except Exception as e:
            print(f"❌ 복원 실패: {e}")

    def schedule_daily_backup(self):
        """일일 백업 스케줄링"""
        import schedule

        def daily_backup():
            important_topics = ['user-events', 'order-events', 'payment-events']
            self.create_backup(topics=important_topics)

        # 매일 새벽 2시에 백업
        schedule.every().day.at("02:00").do(daily_backup)

        print("📅 일일 백업 스케줄 등록 (매일 02:00)")

        while True:
            schedule.run_pending()
            time.sleep(60)

# 사용 예시
if __name__ == "__main__":
    backup_manager = KafkaBackupManager(
        kafka_data_dir='/kafka-data',
        backup_dir='/backups',
        s3_bucket='my-kafka-backups'
    )

    # 백업 생성
    backup_manager.create_backup(['user-events', 'order-events'])

    # 스케줄링 시작
    # backup_manager.schedule_daily_backup()

장애 대응과 복구

장애 시나리오별 대응

# disaster_recovery.py
import time
import subprocess
from kafka_cluster_manager import KafkaClusterManager

class DisasterRecoveryManager:
    def __init__(self, cluster_manager):
        self.cluster_manager = cluster_manager
        self.recovery_procedures = {
            'broker_failure': self.handle_broker_failure,
            'network_partition': self.handle_network_partition,
            'data_corruption': self.handle_data_corruption,
            'zookeeper_failure': self.handle_zookeeper_failure
        }

    def detect_and_respond(self):
        """장애 감지 및 자동 대응"""
        while True:
            try:
                # 1. 브로커 상태 확인
                broker_status = self.check_broker_health()

                # 2. 네트워크 파티션 확인
                network_status = self.check_network_partition()

                # 3. 데이터 무결성 확인
                data_status = self.check_data_integrity()

                # 4. ZooKeeper 상태 확인
                zk_status = self.check_zookeeper_health()

                # 장애 대응
                if not broker_status['healthy']:
                    self.handle_broker_failure(broker_status)

                if not network_status['healthy']:
                    self.handle_network_partition(network_status)

                if not data_status['healthy']:
                    self.handle_data_corruption(data_status)

                if not zk_status['healthy']:
                    self.handle_zookeeper_failure(zk_status)

                time.sleep(30)  # 30초마다 체크

            except Exception as e:
                print(f"❌ 장애 감지 오류: {e}")
                time.sleep(30)

    def handle_broker_failure(self, status):
        """브로커 장애 처리"""
        failed_brokers = status.get('failed_brokers', [])

        for broker_id in failed_brokers:
            print(f"🚨 브로커 {broker_id} 장애 감지")

            # 1. 브로커 재시작 시도
            if self.restart_broker(broker_id):
                print(f"✅ 브로커 {broker_id} 재시작 성공")
                continue

            # 2. 파티션 리더 재선출
            self.trigger_leader_election()

            # 3. 복제본 재분산
            self.redistribute_replicas(broker_id)

            # 4. 알림 전송
            self.send_alert(f"브로커 {broker_id} 장애 - 수동 복구 필요")

    def restart_broker(self, broker_id):
        """브로커 재시작"""
        try:
            # Docker 환경에서 컨테이너 재시작
            restart_cmd = [
                "docker", "restart", f"kafka{broker_id}"
            ]

            result = subprocess.run(restart_cmd, capture_output=True)

            if result.returncode == 0:
                # 재시작 후 상태 확인
                time.sleep(10)
                return self.verify_broker_health(broker_id)
            else:
                print(f"❌ 브로커 {broker_id} 재시작 실패")
                return False

        except Exception as e:
            print(f"❌ 브로커 재시작 오류: {e}")
            return False

    def handle_network_partition(self, status):
        """네트워크 파티션 처리"""
        print("🚨 네트워크 파티션 감지")

        # 1. 클러스터 상태 확인
        cluster_info = self.analyze_cluster_split()

        # 2. 쿼럼 확인
        if cluster_info['has_quorum']:
            print("✅ 쿼럼 유지 - 서비스 계속")
        else:
            print("⚠️ 쿼럼 손실 - 읽기 전용 모드")
            self.enable_readonly_mode()

        # 3. 자동 복구 시도
        self.attempt_network_recovery()

    def handle_data_corruption(self, status):
        """데이터 손상 처리"""
        corrupted_partitions = status.get('corrupted_partitions', [])

        for partition_info in corrupted_partitions:
            topic = partition_info['topic']
            partition = partition_info['partition']

            print(f"🚨 데이터 손상 감지: {topic}-{partition}")

            # 1. 파티션 복구 시도
            self.recover_partition(topic, partition)

            # 2. 백업에서 복원
            self.restore_from_backup(topic, partition)

            # 3. 복제본 재동기화
            self.resync_replicas(topic, partition)

    def create_runbook(self):
        """운영 가이드북 생성"""
        runbook = {
            "kafka_disaster_recovery": {
                "broker_failure": {
                    "detection": [
                        "JMX 메트릭 확인",
                        "헬스체크 API 호출",
                        "로그 파일 분석"
                    ],
                    "immediate_actions": [
                        "다른 브로커들 상태 확인",
                        "파티션 리더 재선출",
                        "클라이언트 재연결 확인"
                    ],
                    "recovery_steps": [
                        "브로커 재시작",
                        "데이터 무결성 확인",
                        "복제본 재동기화",
                        "성능 모니터링"
                    ]
                },
                "complete_cluster_failure": {
                    "detection": [
                        "모든 브로커 응답 없음",
                        "ZooKeeper 연결 실패",
                        "클라이언트 전체 오류"
                    ],
                    "recovery_steps": [
                        "ZooKeeper 클러스터 복구",
                        "브로커들 순차적 재시작",
                        "토픽/파티션 메타데이터 복구",
                        "컨슈머 오프셋 복구",
                        "애플리케이션 재연결"
                    ]
                }
            }
        }

        # 운영 가이드 파일 저장
        import json
        with open('kafka_disaster_recovery_runbook.json', 'w') as f:
            json.dump(runbook, f, indent=2, ensure_ascii=False)

        print("📖 장애 복구 가이드북 생성 완료")

# 사용 예시
if __name__ == "__main__":
    cluster_manager = KafkaClusterManager(['localhost:9092', 'localhost:9093', 'localhost:9094'])
    dr_manager = DisasterRecoveryManager(cluster_manager)

    # 운영 가이드북 생성
    dr_manager.create_runbook()

    # 장애 감지 및 자동 복구 시작
    # dr_manager.detect_and_respond()

이제 Kafka를 프로덕션 환경에서 안정적으로 운영할 수 있는 모든 지식을 갖추었습니다.

핵심 포인트:

  • 클러스터링: 최소 3노드 구성으로 고가용성 확보
  • 모니터링: Prometheus/Grafana 기반 종합 모니터링 시스템
  • 보안: SSL/TLS, SASL, ACL을 통한 다계층 보안
  • 자동화: 백업, 복구, 장애 대응 자동화
  • 운영 가이드: 체계적인 장애 대응 절차 수립

Kafka 3편 완주를 축하합니다! 🎉

실제 프로덕션 환경에서는 비즈니스 요구사항과 트래픽 패턴에 맞는 세밀한 튜닝이 필요합니다. 단계적으로 적용하며 충분한 테스트를 거친 후 배포하세요.

Kafka는 강력한 도구이지만, 올바른 운영 없이는 그 잠재력을 발휘할 수 없습니다. 지속적인 모니터링과 최적화를 통해 안정적인 데이터 스트리밍 플랫폼을 구축하세요.