hobokai 님의 블로그

RabbitMQ 완벽 가이드 3편: 프로덕션 운영과 고급 기능 본문

Data Platform

RabbitMQ 완벽 가이드 3편: 프로덕션 운영과 고급 기능

hobokai 2025. 7. 23. 15:52

목차

  1. 클러스터링과 고가용성
  2. 모니터링과 성능 최적화
  3. 보안 설정
  4. 프로덕션 모범 사례
  5. 트러블슈팅

클러스터링과 고가용성

프로덕션 환경에서는 단일 장애점을 제거하고 높은 가용성을 확보해야 합니다.

클러스터 구성

3노드 클러스터 설정

# 노드 1 (rabbit1)
sudo rabbitmq-server -detached

# 노드 2 (rabbit2)
sudo rabbitmq-server -detached
sudo rabbitmqctl stop_app
sudo rabbitmqctl join_cluster rabbit@rabbit1
sudo rabbitmqctl start_app

# 노드 3 (rabbit3)
sudo rabbitmq-server -detached
sudo rabbitmqctl stop_app
sudo rabbitmqctl join_cluster rabbit@rabbit1
sudo rabbitmqctl start_app

# 클러스터 상태 확인
sudo rabbitmqctl cluster_status

Docker Compose 클러스터

# docker-compose-cluster.yml
version: '3.8'
services:
  rabbitmq1:
    image: rabbitmq:3-management
    hostname: rabbitmq1
    environment:
      - RABBITMQ_ERLANG_COOKIE=SWQOKODSQALRPCLNMEQG
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=admin123
    ports:
      - "5672:5672"
      - "15672:15672"
    volumes:
      - ./cluster-entrypoint.sh:/usr/local/bin/cluster-entrypoint.sh
    command: ["/usr/local/bin/cluster-entrypoint.sh"]

  rabbitmq2:
    image: rabbitmq:3-management
    hostname: rabbitmq2
    environment:
      - RABBITMQ_ERLANG_COOKIE=SWQOKODSQALRPCLNMEQG
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=admin123
    ports:
      - "5673:5672"
      - "15673:15672"
    depends_on:
      - rabbitmq1
    volumes:
      - ./cluster-entrypoint.sh:/usr/local/bin/cluster-entrypoint.sh
    command: ["/usr/local/bin/cluster-entrypoint.sh"]

  rabbitmq3:
    image: rabbitmq:3-management
    hostname: rabbitmq3
    environment:
      - RABBITMQ_ERLANG_COOKIE=SWQOKODSQALRPCLNMEQG
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=admin123
    ports:
      - "5674:5672"
      - "15674:15672"
    depends_on:
      - rabbitmq1
    volumes:
      - ./cluster-entrypoint.sh:/usr/local/bin/cluster-entrypoint.sh
    command: ["/usr/local/bin/cluster-entrypoint.sh"]

  haproxy:
    image: haproxy:2.4
    ports:
      - "5671:5671"
      - "8080:8080"
    volumes:
      - ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg
    depends_on:
      - rabbitmq1
      - rabbitmq2
      - rabbitmq3

cluster-entrypoint.sh

#!/bin/bash
set -e

# RabbitMQ 시작
rabbitmq-server -detached

# 첫 번째 노드가 아니면 클러스터 조인
if [ "$HOSTNAME" != "rabbitmq1" ]; then
    sleep 15
    rabbitmqctl stop_app
    rabbitmqctl join_cluster rabbit@rabbitmq1
    rabbitmqctl start_app
fi

# 관리 플러그인 활성화
rabbitmq-plugins enable rabbitmq_management

# HA 정책 설정 (첫 번째 노드에서만)
if [ "$HOSTNAME" = "rabbitmq1" ]; then
    sleep 20
    rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
fi

# 포그라운드에서 실행
tail -f /var/log/rabbitmq/*.log

고가용성 큐 설정

# 모든 큐를 모든 노드에 복제
sudo rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all"}'

# 특정 큐만 2개 노드에 복제
sudo rabbitmqctl set_policy ha-critical "critical.*" \
  '{"ha-mode":"exactly","ha-params":2}'

# 특정 노드들에만 복제
sudo rabbitmqctl set_policy ha-nodes "important.*" \
  '{"ha-mode":"nodes","ha-params":["rabbit@node1","rabbit@node2"]}'

# 정책 확인
sudo rabbitmqctl list_policies

로드 밸런서 설정

haproxy.cfg

global
    daemon

defaults
    mode tcp
    timeout connect 5s
    timeout client 30s
    timeout server 30s

# RabbitMQ 클러스터 (AMQP)
listen rabbitmq_cluster
    bind *:5671
    mode tcp
    balance roundrobin
    option tcpka
    server rabbit1 rabbitmq1:5672 check
    server rabbit2 rabbitmq2:5672 check
    server rabbit3 rabbitmq3:5672 check

# 관리 인터페이스
listen rabbitmq_admin
    bind *:8080
    mode http
    balance roundrobin
    option httpchk GET /api/healthchecks/node
    server rabbit1 rabbitmq1:15672 check
    server rabbit2 rabbitmq2:15672 check
    server rabbit3 rabbitmq3:15672 check

클라이언트 연결 설정

# 다중 브로커 연결
import pika

class RobustConnection:
    def __init__(self):
        self.brokers = [
            'rabbitmq1:5672',
            'rabbitmq2:5672', 
            'rabbitmq3:5672'
        ]
        self.connection = None
        self.channel = None
        self.connect()

    def connect(self):
        for broker in self.brokers:
            try:
                host, port = broker.split(':')
                self.connection = pika.BlockingConnection(
                    pika.ConnectionParameters(
                        host=host,
                        port=int(port),
                        heartbeat=600,
                        blocked_connection_timeout=300,
                        connection_attempts=3,
                        retry_delay=2
                    )
                )
                self.channel = self.connection.channel()
                print(f"연결 성공: {broker}")
                return
            except Exception as e:
                print(f"연결 실패 {broker}: {e}")
                continue

        raise Exception("모든 브로커 연결 실패")

    def reconnect(self):
        try:
            if self.connection and not self.connection.is_closed:
                self.connection.close()
        except:
            pass
        self.connect()

모니터링과 성능 최적화

Prometheus 통합

# prometheus-stack.yml
version: '3.8'
services:
  rabbitmq-exporter:
    image: kbudde/rabbitmq-exporter:latest
    environment:
      - RABBIT_URL=http://admin:admin123@rabbitmq1:15672
      - RABBIT_CAPABILITIES=bert,no_sort
    ports:
      - "9419:9419"
    depends_on:
      - rabbitmq1

  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - ./alerts.yml:/etc/prometheus/alerts.yml

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin123
    volumes:
      - grafana-storage:/var/lib/grafana
      - ./grafana-dashboards:/etc/grafana/provisioning/dashboards
      - ./grafana-datasources:/etc/grafana/provisioning/datasources

volumes:
  grafana-storage:

prometheus.yml

global:
  scrape_interval: 15s

rule_files:
  - "alerts.yml"

scrape_configs:
  - job_name: 'rabbitmq-exporter'
    static_configs:
      - targets: ['rabbitmq-exporter:9419']

  - job_name: 'rabbitmq-management'
    static_configs:
      - targets: ['rabbitmq1:15692', 'rabbitmq2:15692', 'rabbitmq3:15692']
    metrics_path: /metrics
    params:
      family: ['queue_metrics', 'connection_metrics']

alerting:
  alertmanagers:
    - static_configs:
        - targets: ['alertmanager:9093']

핵심 모니터링 지표

# 커스텀 모니터링 스크립트
import pika
import requests
import json
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway

class RabbitMQMonitor:
    def __init__(self, management_url, username, password):
        self.management_url = management_url
        self.auth = (username, password)
        self.registry = CollectorRegistry()

        # 메트릭 정의
        self.queue_messages = Gauge(
            'rabbitmq_queue_messages_total',
            'Total messages in queue',
            ['queue', 'vhost'],
            registry=self.registry
        )

        self.queue_consumers = Gauge(
            'rabbitmq_queue_consumers',
            'Number of consumers',
            ['queue', 'vhost'],
            registry=self.registry
        )

        self.connection_count = Gauge(
            'rabbitmq_connections_total',
            'Total connections',
            registry=self.registry
        )

    def collect_metrics(self):
        # 큐 정보 수집
        queues = self.get_queues()
        for queue in queues:
            self.queue_messages.labels(
                queue=queue['name'],
                vhost=queue['vhost']
            ).set(queue.get('messages', 0))

            self.queue_consumers.labels(
                queue=queue['name'],
                vhost=queue['vhost']
            ).set(queue.get('consumers', 0))

        # 연결 정보 수집
        connections = self.get_connections()
        self.connection_count.set(len(connections))

    def get_queues(self):
        response = requests.get(
            f"{self.management_url}/api/queues",
            auth=self.auth
        )
        return response.json()

    def get_connections(self):
        response = requests.get(
            f"{self.management_url}/api/connections",
            auth=self.auth
        )
        return response.json()

    def push_metrics(self, pushgateway_url):
        push_to_gateway(
            pushgateway_url,
            job='rabbitmq-custom-monitor',
            registry=self.registry
        )

# 사용 예시
monitor = RabbitMQMonitor(
    'http://localhost:15672',
    'admin',
    'admin123'
)
monitor.collect_metrics()
monitor.push_metrics('localhost:9091')

알림 규칙 설정

alerts.yml

groups:
- name: rabbitmq-alerts
  rules:
  - alert: RabbitMQDown
    expr: up{job="rabbitmq-exporter"} == 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "RabbitMQ is down"
      description: "RabbitMQ instance has been down for more than 1 minute"

  - alert: RabbitMQHighQueueSize
    expr: rabbitmq_queue_messages > 1000
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "High queue size detected"
      description: "Queue {{ $labels.queue }} has {{ $value }} messages"

  - alert: RabbitMQNoConsumers
    expr: rabbitmq_queue_messages > 100 and rabbitmq_queue_consumers == 0
    for: 2m
    labels:
      severity: critical
    annotations:
      summary: "Queue has messages but no consumers"
      description: "Queue {{ $labels.queue }} has messages but no active consumers"

  - alert: RabbitMQMemoryHigh
    expr: rabbitmq_node_mem_used / rabbitmq_node_mem_limit > 0.8
    for: 3m
    labels:
      severity: warning
    annotations:
      summary: "RabbitMQ memory usage high"
      description: "Memory usage is above 80%"

보안 설정

SSL/TLS 설정

rabbitmq.conf

# SSL 리스너 설정
listeners.ssl.default = 5671
ssl_options.cacertfile = /etc/rabbitmq/ca_certificate.pem
ssl_options.certfile   = /etc/rabbitmq/server_certificate.pem
ssl_options.keyfile    = /etc/rabbitmq/server_key.pem
ssl_options.verify     = verify_peer
ssl_options.fail_if_no_peer_cert = true

# 최신 TLS 버전만 허용
ssl_options.versions.1 = tlsv1.2
ssl_options.versions.2 = tlsv1.3

# 관리 인터페이스 SSL
management.ssl.port = 15671
management.ssl.cacertfile = /etc/rabbitmq/ca_certificate.pem
management.ssl.certfile   = /etc/rabbitmq/server_certificate.pem
management.ssl.keyfile    = /etc/rabbitmq/server_key.pem

LDAP 인증 설정

# LDAP 플러그인 활성화
auth_backends.1 = rabbit_auth_backend_ldap
auth_backends.2 = rabbit_auth_backend_internal

# LDAP 서버 설정
auth_ldap.servers.1 = ldap.company.com
auth_ldap.port = 389
auth_ldap.use_ssl = false
auth_ldap.use_starttls = true

# 사용자 검색
auth_ldap.user_dn_pattern = uid=${username},ou=people,dc=company,dc=com
auth_ldap.other_bind = as_user

# 권한 매핑
auth_ldap.vhost_access_query = {constant, true}
auth_ldap.resource_access_query = {constant, true}
auth_ldap.tag_queries.administrator = {constant, false}
auth_ldap.tag_queries.management = {constant, true}

네트워크 보안

# 방화벽 설정 (Ubuntu)
sudo ufw allow 5672/tcp   # AMQP
sudo ufw allow 5671/tcp   # AMQPS
sudo ufw allow 15672/tcp  # HTTP Management
sudo ufw allow 15671/tcp  # HTTPS Management
sudo ufw allow 25672/tcp  # Clustering

# 특정 IP만 허용
sudo ufw allow from 192.168.1.0/24 to any port 5672

프로덕션 모범 사례

설정 최적화

rabbitmq.conf (프로덕션 설정)

# 기본 설정
listeners.tcp.default = 5672
default_user = admin
default_pass = CHANGE_THIS_PASSWORD

# 메모리 관리
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark_paging_ratio = 0.5
disk_free_limit.relative = 2.0

# 로그 설정
log.console = false
log.console.level = info
log.file = /var/log/rabbitmq/rabbit.log
log.file.level = info
log.file.rotation.date = $D0
log.file.rotation.size = 10485760

# 네트워크 설정
tcp_listen_options.backlog = 128
tcp_listen_options.nodelay = true
tcp_listen_options.keepalive = true
tcp_listen_options.exit_on_close = false

# 클러스터 설정
cluster_formation.peer_discovery_backend = classic_config
cluster_formation.classic_config.nodes.1 = rabbit@node1
cluster_formation.classic_config.nodes.2 = rabbit@node2
cluster_formation.classic_config.nodes.3 = rabbit@node3

# 하트비트 설정
heartbeat = 60

애플리케이션 코드 최적화

# 프로덕션용 연결 관리
import pika
import logging
from contextlib import contextmanager

class ProductionRabbitMQ:
    def __init__(self, config):
        self.config = config
        self.connection_pool = []
        self.logger = logging.getLogger(__name__)

    @contextmanager
    def get_channel(self):
        connection = None
        channel = None
        try:
            connection = self._get_connection()
            channel = connection.channel()

            # 프로덕션 설정
            channel.basic_qos(
                prefetch_count=self.config.get('prefetch_count', 10)
            )

            yield channel

        except Exception as e:
            self.logger.error(f"채널 오류: {e}")
            if connection and not connection.is_closed:
                connection.close()
            raise
        finally:
            if channel and not channel.is_closed:
                channel.close()
            if connection and not connection.is_closed:
                self._return_connection(connection)

    def _get_connection(self):
        # 연결 풀에서 사용 가능한 연결 찾기
        for conn in self.connection_pool:
            if not conn.is_closed:
                return conn

        # 새 연결 생성
        return self._create_connection()

    def _create_connection(self):
        return pika.BlockingConnection(
            pika.ConnectionParameters(
                host=self.config['host'],
                port=self.config['port'],
                virtual_host=self.config['vhost'],
                credentials=pika.PlainCredentials(
                    self.config['username'],
                    self.config['password']
                ),
                heartbeat=600,
                blocked_connection_timeout=300,
                connection_attempts=3,
                retry_delay=2
            )
        )

    def publish_with_retry(self, exchange, routing_key, body, max_retries=3):
        for attempt in range(max_retries):
            try:
                with self.get_channel() as channel:
                    channel.basic_publish(
                        exchange=exchange,
                        routing_key=routing_key,
                        body=body,
                        properties=pika.BasicProperties(
                            delivery_mode=2,  # 메시지 지속성
                            mandatory=True    # 라우팅 실패 시 반환
                        )
                    )
                return True
            except Exception as e:
                self.logger.warning(f"발송 시도 {attempt + 1} 실패: {e}")
                if attempt == max_retries - 1:
                    raise
        return False

용량 계획

# 용량 계산기
class CapacityPlanner:
    def __init__(self):
        self.message_size_avg = 1024  # bytes
        self.messages_per_second = 1000
        self.retention_hours = 24
        self.safety_factor = 2.0

    def calculate_storage_requirements(self):
        # 시간당 저장 공간
        hourly_storage = (
            self.messages_per_second * 
            3600 * 
            self.message_size_avg
        )

        # 총 저장 공간 (안전 계수 적용)
        total_storage = (
            hourly_storage * 
            self.retention_hours * 
            self.safety_factor
        )

        return {
            'hourly_mb': hourly_storage / (1024 * 1024),
            'total_gb': total_storage / (1024 * 1024 * 1024),
            'recommended_ram_gb': total_storage / (1024 * 1024 * 1024) * 0.4
        }

    def calculate_partition_count(self, target_throughput_mb_per_sec):
        # 파티션당 처리량 (일반적으로 10MB/s)
        partition_throughput = 10

        return max(
            1,
            int(target_throughput_mb_per_sec / partition_throughput) + 1
        )

# 사용 예시
planner = CapacityPlanner()
requirements = planner.calculate_storage_requirements()
print(f"권장 RAM: {requirements['recommended_ram_gb']:.2f} GB")
print(f"총 저장 공간: {requirements['total_gb']:.2f} GB")

트러블슈팅

일반적인 문제와 해결책

1. 메모리 알람

# 메모리 상태 확인
sudo rabbitmqctl status | grep memory

# 임시 해결 (위험!)
sudo rabbitmqctl set_vm_memory_high_watermark 0.8

# 근본적 해결: 큐 정리 또는 메모리 증설
sudo rabbitmqctl purge_queue queue_name

2. 큐 적체

# 큐 상태 확인
sudo rabbitmqctl list_queues name messages consumers

# 소비자 추가 또는 prefetch 조정
channel.basic_qos(prefetch_count=50)

3. 연결 문제

# 연결 상태 진단
def diagnose_connection():
    try:
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                'localhost',
                heartbeat=0,  # 하트비트 비활성화
                connection_attempts=1
            )
        )
        print("✓ 연결 성공")
        connection.close()
    except pika.exceptions.AMQPConnectionError as e:
        print(f"✗ 연결 실패: {e}")
    except Exception as e:
        print(f"✗ 기타 오류: {e}")

로그 분석

# 오류 로그 모니터링
tail -f /var/log/rabbitmq/rabbit@hostname.log | grep ERROR

# 연결 로그 확인
sudo rabbitmqctl list_connections name peer_host peer_port state

# 큐별 통계
sudo rabbitmqctl list_queues name messages consumers memory

이제 RabbitMQ를 프로덕션 환경에서 안정적으로 운영할 수 있는 모든 지식을 갖추었습니다. 클러스터링으로 고가용성을 확보하고, 모니터링으로 시스템 상태를 파악하며, 보안 설정으로 안전을 보장하세요.

핵심 포인트:

  • 최소 3노드 클러스터 구성
  • 포괄적인 모니터링 시스템 구축
  • 보안 설정 철저히 적용
  • 용량 계획과 성능 튜닝
  • 장애 상황 대비 계획 수립

실제 프로덕션 환경에서는 단계적으로 적용하며, 충분한 테스트를 거친 후 배포하는 것이 중요합니다.