Notice
Recent Posts
Recent Comments
Link
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | |
| 7 | 8 | 9 | 10 | 11 | 12 | 13 |
| 14 | 15 | 16 | 17 | 18 | 19 | 20 |
| 21 | 22 | 23 | 24 | 25 | 26 | 27 |
| 28 | 29 | 30 | 31 |
Tags
- 모니터링
- 이벤트 스트리밍
- 메시징 패턴
- ApacheBench
- 마이크로서비스 운영
- 세션저장소
- 분산 시스템
- 클라우드
- 고가용성
- docker
- 인메모리데이터베이스
- rabbitmq
- 서비스 설계
- 모노리스 분해
- infrastructureascode
- 클러스터
- Python
- 서비스 메시
- 메시지 브로커
- 마이크로서비스 통신
- kubernetes
- 보안
- 마이크로서비스
- 분산 모니터링
- CI/CD
- devops
- 컨테이너오케스트레이션
- RabbitMQ Exchange
- Kafka 클러스터
- 프로덕션 운영
Archives
- Today
- Total
hobokai 님의 블로그
RabbitMQ 완벽 가이드 3편: 프로덕션 운영과 고급 기능 본문
목차
클러스터링과 고가용성
프로덕션 환경에서는 단일 장애점을 제거하고 높은 가용성을 확보해야 합니다.
클러스터 구성
3노드 클러스터 설정
# 노드 1 (rabbit1)
sudo rabbitmq-server -detached
# 노드 2 (rabbit2)
sudo rabbitmq-server -detached
sudo rabbitmqctl stop_app
sudo rabbitmqctl join_cluster rabbit@rabbit1
sudo rabbitmqctl start_app
# 노드 3 (rabbit3)
sudo rabbitmq-server -detached
sudo rabbitmqctl stop_app
sudo rabbitmqctl join_cluster rabbit@rabbit1
sudo rabbitmqctl start_app
# 클러스터 상태 확인
sudo rabbitmqctl cluster_status
Docker Compose 클러스터
# docker-compose-cluster.yml
version: '3.8'
services:
rabbitmq1:
image: rabbitmq:3-management
hostname: rabbitmq1
environment:
- RABBITMQ_ERLANG_COOKIE=SWQOKODSQALRPCLNMEQG
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin123
ports:
- "5672:5672"
- "15672:15672"
volumes:
- ./cluster-entrypoint.sh:/usr/local/bin/cluster-entrypoint.sh
command: ["/usr/local/bin/cluster-entrypoint.sh"]
rabbitmq2:
image: rabbitmq:3-management
hostname: rabbitmq2
environment:
- RABBITMQ_ERLANG_COOKIE=SWQOKODSQALRPCLNMEQG
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin123
ports:
- "5673:5672"
- "15673:15672"
depends_on:
- rabbitmq1
volumes:
- ./cluster-entrypoint.sh:/usr/local/bin/cluster-entrypoint.sh
command: ["/usr/local/bin/cluster-entrypoint.sh"]
rabbitmq3:
image: rabbitmq:3-management
hostname: rabbitmq3
environment:
- RABBITMQ_ERLANG_COOKIE=SWQOKODSQALRPCLNMEQG
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin123
ports:
- "5674:5672"
- "15674:15672"
depends_on:
- rabbitmq1
volumes:
- ./cluster-entrypoint.sh:/usr/local/bin/cluster-entrypoint.sh
command: ["/usr/local/bin/cluster-entrypoint.sh"]
haproxy:
image: haproxy:2.4
ports:
- "5671:5671"
- "8080:8080"
volumes:
- ./haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg
depends_on:
- rabbitmq1
- rabbitmq2
- rabbitmq3
cluster-entrypoint.sh
#!/bin/bash
set -e
# RabbitMQ 시작
rabbitmq-server -detached
# 첫 번째 노드가 아니면 클러스터 조인
if [ "$HOSTNAME" != "rabbitmq1" ]; then
sleep 15
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq1
rabbitmqctl start_app
fi
# 관리 플러그인 활성화
rabbitmq-plugins enable rabbitmq_management
# HA 정책 설정 (첫 번째 노드에서만)
if [ "$HOSTNAME" = "rabbitmq1" ]; then
sleep 20
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
fi
# 포그라운드에서 실행
tail -f /var/log/rabbitmq/*.log
고가용성 큐 설정
# 모든 큐를 모든 노드에 복제
sudo rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all"}'
# 특정 큐만 2개 노드에 복제
sudo rabbitmqctl set_policy ha-critical "critical.*" \
'{"ha-mode":"exactly","ha-params":2}'
# 특정 노드들에만 복제
sudo rabbitmqctl set_policy ha-nodes "important.*" \
'{"ha-mode":"nodes","ha-params":["rabbit@node1","rabbit@node2"]}'
# 정책 확인
sudo rabbitmqctl list_policies
로드 밸런서 설정
haproxy.cfg
global
daemon
defaults
mode tcp
timeout connect 5s
timeout client 30s
timeout server 30s
# RabbitMQ 클러스터 (AMQP)
listen rabbitmq_cluster
bind *:5671
mode tcp
balance roundrobin
option tcpka
server rabbit1 rabbitmq1:5672 check
server rabbit2 rabbitmq2:5672 check
server rabbit3 rabbitmq3:5672 check
# 관리 인터페이스
listen rabbitmq_admin
bind *:8080
mode http
balance roundrobin
option httpchk GET /api/healthchecks/node
server rabbit1 rabbitmq1:15672 check
server rabbit2 rabbitmq2:15672 check
server rabbit3 rabbitmq3:15672 check클라이언트 연결 설정
# 다중 브로커 연결
import pika
class RobustConnection:
def __init__(self):
self.brokers = [
'rabbitmq1:5672',
'rabbitmq2:5672',
'rabbitmq3:5672'
]
self.connection = None
self.channel = None
self.connect()
def connect(self):
for broker in self.brokers:
try:
host, port = broker.split(':')
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=host,
port=int(port),
heartbeat=600,
blocked_connection_timeout=300,
connection_attempts=3,
retry_delay=2
)
)
self.channel = self.connection.channel()
print(f"연결 성공: {broker}")
return
except Exception as e:
print(f"연결 실패 {broker}: {e}")
continue
raise Exception("모든 브로커 연결 실패")
def reconnect(self):
try:
if self.connection and not self.connection.is_closed:
self.connection.close()
except:
pass
self.connect()
모니터링과 성능 최적화
Prometheus 통합
# prometheus-stack.yml
version: '3.8'
services:
rabbitmq-exporter:
image: kbudde/rabbitmq-exporter:latest
environment:
- RABBIT_URL=http://admin:admin123@rabbitmq1:15672
- RABBIT_CAPABILITIES=bert,no_sort
ports:
- "9419:9419"
depends_on:
- rabbitmq1
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- ./alerts.yml:/etc/prometheus/alerts.yml
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin123
volumes:
- grafana-storage:/var/lib/grafana
- ./grafana-dashboards:/etc/grafana/provisioning/dashboards
- ./grafana-datasources:/etc/grafana/provisioning/datasources
volumes:
grafana-storage:
prometheus.yml
global:
scrape_interval: 15s
rule_files:
- "alerts.yml"
scrape_configs:
- job_name: 'rabbitmq-exporter'
static_configs:
- targets: ['rabbitmq-exporter:9419']
- job_name: 'rabbitmq-management'
static_configs:
- targets: ['rabbitmq1:15692', 'rabbitmq2:15692', 'rabbitmq3:15692']
metrics_path: /metrics
params:
family: ['queue_metrics', 'connection_metrics']
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']
핵심 모니터링 지표
# 커스텀 모니터링 스크립트
import pika
import requests
import json
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
class RabbitMQMonitor:
def __init__(self, management_url, username, password):
self.management_url = management_url
self.auth = (username, password)
self.registry = CollectorRegistry()
# 메트릭 정의
self.queue_messages = Gauge(
'rabbitmq_queue_messages_total',
'Total messages in queue',
['queue', 'vhost'],
registry=self.registry
)
self.queue_consumers = Gauge(
'rabbitmq_queue_consumers',
'Number of consumers',
['queue', 'vhost'],
registry=self.registry
)
self.connection_count = Gauge(
'rabbitmq_connections_total',
'Total connections',
registry=self.registry
)
def collect_metrics(self):
# 큐 정보 수집
queues = self.get_queues()
for queue in queues:
self.queue_messages.labels(
queue=queue['name'],
vhost=queue['vhost']
).set(queue.get('messages', 0))
self.queue_consumers.labels(
queue=queue['name'],
vhost=queue['vhost']
).set(queue.get('consumers', 0))
# 연결 정보 수집
connections = self.get_connections()
self.connection_count.set(len(connections))
def get_queues(self):
response = requests.get(
f"{self.management_url}/api/queues",
auth=self.auth
)
return response.json()
def get_connections(self):
response = requests.get(
f"{self.management_url}/api/connections",
auth=self.auth
)
return response.json()
def push_metrics(self, pushgateway_url):
push_to_gateway(
pushgateway_url,
job='rabbitmq-custom-monitor',
registry=self.registry
)
# 사용 예시
monitor = RabbitMQMonitor(
'http://localhost:15672',
'admin',
'admin123'
)
monitor.collect_metrics()
monitor.push_metrics('localhost:9091')
알림 규칙 설정
alerts.yml
groups:
- name: rabbitmq-alerts
rules:
- alert: RabbitMQDown
expr: up{job="rabbitmq-exporter"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "RabbitMQ is down"
description: "RabbitMQ instance has been down for more than 1 minute"
- alert: RabbitMQHighQueueSize
expr: rabbitmq_queue_messages > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "High queue size detected"
description: "Queue {{ $labels.queue }} has {{ $value }} messages"
- alert: RabbitMQNoConsumers
expr: rabbitmq_queue_messages > 100 and rabbitmq_queue_consumers == 0
for: 2m
labels:
severity: critical
annotations:
summary: "Queue has messages but no consumers"
description: "Queue {{ $labels.queue }} has messages but no active consumers"
- alert: RabbitMQMemoryHigh
expr: rabbitmq_node_mem_used / rabbitmq_node_mem_limit > 0.8
for: 3m
labels:
severity: warning
annotations:
summary: "RabbitMQ memory usage high"
description: "Memory usage is above 80%"
보안 설정
SSL/TLS 설정
rabbitmq.conf
# SSL 리스너 설정
listeners.ssl.default = 5671
ssl_options.cacertfile = /etc/rabbitmq/ca_certificate.pem
ssl_options.certfile = /etc/rabbitmq/server_certificate.pem
ssl_options.keyfile = /etc/rabbitmq/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
# 최신 TLS 버전만 허용
ssl_options.versions.1 = tlsv1.2
ssl_options.versions.2 = tlsv1.3
# 관리 인터페이스 SSL
management.ssl.port = 15671
management.ssl.cacertfile = /etc/rabbitmq/ca_certificate.pem
management.ssl.certfile = /etc/rabbitmq/server_certificate.pem
management.ssl.keyfile = /etc/rabbitmq/server_key.pem
LDAP 인증 설정
# LDAP 플러그인 활성화
auth_backends.1 = rabbit_auth_backend_ldap
auth_backends.2 = rabbit_auth_backend_internal
# LDAP 서버 설정
auth_ldap.servers.1 = ldap.company.com
auth_ldap.port = 389
auth_ldap.use_ssl = false
auth_ldap.use_starttls = true
# 사용자 검색
auth_ldap.user_dn_pattern = uid=${username},ou=people,dc=company,dc=com
auth_ldap.other_bind = as_user
# 권한 매핑
auth_ldap.vhost_access_query = {constant, true}
auth_ldap.resource_access_query = {constant, true}
auth_ldap.tag_queries.administrator = {constant, false}
auth_ldap.tag_queries.management = {constant, true}
네트워크 보안
# 방화벽 설정 (Ubuntu)
sudo ufw allow 5672/tcp # AMQP
sudo ufw allow 5671/tcp # AMQPS
sudo ufw allow 15672/tcp # HTTP Management
sudo ufw allow 15671/tcp # HTTPS Management
sudo ufw allow 25672/tcp # Clustering
# 특정 IP만 허용
sudo ufw allow from 192.168.1.0/24 to any port 5672
프로덕션 모범 사례
설정 최적화
rabbitmq.conf (프로덕션 설정)
# 기본 설정
listeners.tcp.default = 5672
default_user = admin
default_pass = CHANGE_THIS_PASSWORD
# 메모리 관리
vm_memory_high_watermark.relative = 0.6
vm_memory_high_watermark_paging_ratio = 0.5
disk_free_limit.relative = 2.0
# 로그 설정
log.console = false
log.console.level = info
log.file = /var/log/rabbitmq/rabbit.log
log.file.level = info
log.file.rotation.date = $D0
log.file.rotation.size = 10485760
# 네트워크 설정
tcp_listen_options.backlog = 128
tcp_listen_options.nodelay = true
tcp_listen_options.keepalive = true
tcp_listen_options.exit_on_close = false
# 클러스터 설정
cluster_formation.peer_discovery_backend = classic_config
cluster_formation.classic_config.nodes.1 = rabbit@node1
cluster_formation.classic_config.nodes.2 = rabbit@node2
cluster_formation.classic_config.nodes.3 = rabbit@node3
# 하트비트 설정
heartbeat = 60
애플리케이션 코드 최적화
# 프로덕션용 연결 관리
import pika
import logging
from contextlib import contextmanager
class ProductionRabbitMQ:
def __init__(self, config):
self.config = config
self.connection_pool = []
self.logger = logging.getLogger(__name__)
@contextmanager
def get_channel(self):
connection = None
channel = None
try:
connection = self._get_connection()
channel = connection.channel()
# 프로덕션 설정
channel.basic_qos(
prefetch_count=self.config.get('prefetch_count', 10)
)
yield channel
except Exception as e:
self.logger.error(f"채널 오류: {e}")
if connection and not connection.is_closed:
connection.close()
raise
finally:
if channel and not channel.is_closed:
channel.close()
if connection and not connection.is_closed:
self._return_connection(connection)
def _get_connection(self):
# 연결 풀에서 사용 가능한 연결 찾기
for conn in self.connection_pool:
if not conn.is_closed:
return conn
# 새 연결 생성
return self._create_connection()
def _create_connection(self):
return pika.BlockingConnection(
pika.ConnectionParameters(
host=self.config['host'],
port=self.config['port'],
virtual_host=self.config['vhost'],
credentials=pika.PlainCredentials(
self.config['username'],
self.config['password']
),
heartbeat=600,
blocked_connection_timeout=300,
connection_attempts=3,
retry_delay=2
)
)
def publish_with_retry(self, exchange, routing_key, body, max_retries=3):
for attempt in range(max_retries):
try:
with self.get_channel() as channel:
channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=body,
properties=pika.BasicProperties(
delivery_mode=2, # 메시지 지속성
mandatory=True # 라우팅 실패 시 반환
)
)
return True
except Exception as e:
self.logger.warning(f"발송 시도 {attempt + 1} 실패: {e}")
if attempt == max_retries - 1:
raise
return False
용량 계획
# 용량 계산기
class CapacityPlanner:
def __init__(self):
self.message_size_avg = 1024 # bytes
self.messages_per_second = 1000
self.retention_hours = 24
self.safety_factor = 2.0
def calculate_storage_requirements(self):
# 시간당 저장 공간
hourly_storage = (
self.messages_per_second *
3600 *
self.message_size_avg
)
# 총 저장 공간 (안전 계수 적용)
total_storage = (
hourly_storage *
self.retention_hours *
self.safety_factor
)
return {
'hourly_mb': hourly_storage / (1024 * 1024),
'total_gb': total_storage / (1024 * 1024 * 1024),
'recommended_ram_gb': total_storage / (1024 * 1024 * 1024) * 0.4
}
def calculate_partition_count(self, target_throughput_mb_per_sec):
# 파티션당 처리량 (일반적으로 10MB/s)
partition_throughput = 10
return max(
1,
int(target_throughput_mb_per_sec / partition_throughput) + 1
)
# 사용 예시
planner = CapacityPlanner()
requirements = planner.calculate_storage_requirements()
print(f"권장 RAM: {requirements['recommended_ram_gb']:.2f} GB")
print(f"총 저장 공간: {requirements['total_gb']:.2f} GB")
트러블슈팅
일반적인 문제와 해결책
1. 메모리 알람
# 메모리 상태 확인
sudo rabbitmqctl status | grep memory
# 임시 해결 (위험!)
sudo rabbitmqctl set_vm_memory_high_watermark 0.8
# 근본적 해결: 큐 정리 또는 메모리 증설
sudo rabbitmqctl purge_queue queue_name
2. 큐 적체
# 큐 상태 확인
sudo rabbitmqctl list_queues name messages consumers
# 소비자 추가 또는 prefetch 조정
channel.basic_qos(prefetch_count=50)
3. 연결 문제
# 연결 상태 진단
def diagnose_connection():
try:
connection = pika.BlockingConnection(
pika.ConnectionParameters(
'localhost',
heartbeat=0, # 하트비트 비활성화
connection_attempts=1
)
)
print("✓ 연결 성공")
connection.close()
except pika.exceptions.AMQPConnectionError as e:
print(f"✗ 연결 실패: {e}")
except Exception as e:
print(f"✗ 기타 오류: {e}")
로그 분석
# 오류 로그 모니터링
tail -f /var/log/rabbitmq/rabbit@hostname.log | grep ERROR
# 연결 로그 확인
sudo rabbitmqctl list_connections name peer_host peer_port state
# 큐별 통계
sudo rabbitmqctl list_queues name messages consumers memory
이제 RabbitMQ를 프로덕션 환경에서 안정적으로 운영할 수 있는 모든 지식을 갖추었습니다. 클러스터링으로 고가용성을 확보하고, 모니터링으로 시스템 상태를 파악하며, 보안 설정으로 안전을 보장하세요.
핵심 포인트:
- 최소 3노드 클러스터 구성
- 포괄적인 모니터링 시스템 구축
- 보안 설정 철저히 적용
- 용량 계획과 성능 튜닝
- 장애 상황 대비 계획 수립
실제 프로덕션 환경에서는 단계적으로 적용하며, 충분한 테스트를 거친 후 배포하는 것이 중요합니다.
'Data Platform' 카테고리의 다른 글
| CAP 이론에 대해 아시나요? (0) | 2025.08.18 |
|---|---|
| RabbitMQ 완벽 가이드 2편: Exchange와 라우팅 패턴 (0) | 2025.07.23 |
| RabbitMQ 완벽 가이드 1편: 기초 개념과 설치 (0) | 2025.07.23 |
| Kafka 프로덕션 가이드 - 클러스터링, 모니터링, 보안, 운영 자동화 (1) | 2025.07.23 |
| Kafka Producer/Consumer 고급 가이드 - 파티셔닝, 스트림 처리, 성능 최적화 (3) | 2025.07.23 |