hobokai 님의 블로그

RabbitMQ 완벽 가이드 2편: Exchange와 라우팅 패턴 본문

Data Platform

RabbitMQ 완벽 가이드 2편: Exchange와 라우팅 패턴

hobokai 2025. 7. 23. 15:48

목차

  1. Exchange 심화 이해
  2. Direct Exchange 패턴
  3. Topic Exchange 패턴
  4. Fanout Exchange 패턴
  5. Headers Exchange 패턴
  6. 실전 메시징 패턴

Exchange 심화 이해

1편에서 배운 기본 개념을 바탕으로, 이제 RabbitMQ의 핵심인 Exchange를 자세히 알아보겠습니다.

Exchange란?

Exchange는 RabbitMQ의 라우터 역할을 합니다. Producer가 보낸 메시지를 어떤 Queue로 보낼지 결정하는 것이 Exchange의 역할입니다.

# Exchange 선언
channel.exchange_declare(
    exchange='my_exchange',
    exchange_type='direct',  # 타입 지정
    durable=True,           # 지속성
    auto_delete=False       # 자동 삭제 여부
)

Default Exchange

RabbitMQ는 빈 문자열("")로 표현되는 기본 Exchange를 제공합니다.

# Default exchange 사용 (1편에서 사용한 방식)
channel.basic_publish(
    exchange='',           # 빈 문자열 = default exchange
    routing_key='hello',   # queue 이름과 동일
    body='Hello World!'
)

Direct Exchange 패턴

Direct Exchange는 라우팅 키가 정확히 일치하는 큐로 메시지를 전송합니다.

기본 구조

Producer → Direct Exchange → [라우팅 키 매칭] → Queue → Consumer

구현 예제: 로그 레벨별 처리

publisher.py

import pika
import sys

def send_log(log_level, message):
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()

    # Direct exchange 선언
    channel.exchange_declare(
        exchange='direct_logs',
        exchange_type='direct',
        durable=True
    )

    # 메시지 전송
    channel.basic_publish(
        exchange='direct_logs',
        routing_key=log_level,  # 'error', 'warning', 'info'
        body=message,
        properties=pika.BasicProperties(delivery_mode=2)
    )

    print(f" [x] {log_level}: {message}")
    connection.close()

# 사용 예시
send_log('error', '데이터베이스 연결 실패')
send_log('warning', '메모리 사용량 80% 초과')
send_log('info', '사용자 로그인 성공')

consumer.py

import pika
import sys

def setup_consumer(log_levels):
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()

    # Exchange 선언
    channel.exchange_declare(
        exchange='direct_logs',
        exchange_type='direct',
        durable=True
    )

    # 전용 큐 생성 (익명 큐)
    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue

    # 지정된 로그 레벨들에 대해 바인딩
    for level in log_levels:
        channel.queue_bind(
            exchange='direct_logs',
            queue=queue_name,
            routing_key=level
        )

    def callback(ch, method, properties, body):
        print(f" [x] {method.routing_key}: {body.decode()}")

    channel.basic_consume(
        queue=queue_name,
        on_message_callback=callback,
        auto_ack=True
    )

    print(f' [*] {log_levels} 로그를 기다리는 중...')
    channel.start_consuming()

# 에러 로그만 처리하는 소비자
setup_consumer(['error'])

# 또는 모든 로그를 처리하는 소비자
# setup_consumer(['error', 'warning', 'info'])

실행 결과

# 터미널 1: 에러 로그 소비자
python consumer.py
# [*] ['error'] 로그를 기다리는 중...

# 터미널 2: 로그 발송
python publisher.py
# [x] error: 데이터베이스 연결 실패

# 터미널 1에서 확인
# [x] error: 데이터베이스 연결 실패

Topic Exchange 패턴

Topic Exchange는 와일드카드를 사용한 패턴 매칭으로 더 유연한 라우팅을 제공합니다.

와일드카드 규칙

  • * (별표): 정확히 하나의 단어와 매칭
  • # (해시): 0개 이상의 단어와 매칭

패턴 예시

user.profile.update  →  user.*.update     ✓ 매칭
user.profile.update  →  user.#            ✓ 매칭
order.payment.failed →  *.payment.*       ✓ 매칭
order.payment.failed →  order.#           ✓ 매칭

구현 예제: 이커머스 이벤트 시스템

event_publisher.py

import pika
import json
from datetime import datetime

class EventPublisher:
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()

        # Topic exchange 선언
        self.channel.exchange_declare(
            exchange='ecommerce_events',
            exchange_type='topic',
            durable=True
        )

    def publish_event(self, service, action, data):
        routing_key = f"{service}.{action}"

        event = {
            'service': service,
            'action': action,
            'data': data,
            'timestamp': datetime.now().isoformat()
        }

        self.channel.basic_publish(
            exchange='ecommerce_events',
            routing_key=routing_key,
            body=json.dumps(event, ensure_ascii=False),
            properties=pika.BasicProperties(delivery_mode=2)
        )

        print(f" [x] 이벤트 발송: {routing_key}")

    def close(self):
        self.connection.close()

# 사용 예시
publisher = EventPublisher()

# 사용자 관련 이벤트
publisher.publish_event('user', 'signup', {'user_id': 123, 'email': 'user@example.com'})
publisher.publish_event('user', 'login', {'user_id': 123})
publisher.publish_event('user', 'profile.update', {'user_id': 123, 'field': 'email'})

# 주문 관련 이벤트
publisher.publish_event('order', 'created', {'order_id': 456, 'amount': 50000})
publisher.publish_event('order', 'payment.success', {'order_id': 456})
publisher.publish_event('order', 'shipped', {'order_id': 456, 'tracking': 'TRK123'})

publisher.close()

specialized_consumers.py

import pika
import json

class EventConsumer:
    def __init__(self, name, patterns):
        self.name = name
        self.patterns = patterns
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()

        # Exchange 선언
        self.channel.exchange_declare(
            exchange='ecommerce_events',
            exchange_type='topic',
            durable=True
        )

        # 전용 큐 생성
        result = self.channel.queue_declare(
            queue=f'{name}_queue',
            durable=True
        )

        # 패턴별 바인딩
        for pattern in patterns:
            self.channel.queue_bind(
                exchange='ecommerce_events',
                queue=f'{name}_queue',
                routing_key=pattern
            )

    def callback(self, ch, method, properties, body):
        event = json.loads(body.decode())
        print(f" [{self.name}] {method.routing_key}: {event['data']}")

        # 서비스별 처리 로직
        if 'user' in method.routing_key:
            self.handle_user_event(event)
        elif 'order' in method.routing_key:
            self.handle_order_event(event)

        ch.basic_ack(delivery_tag=method.delivery_tag)

    def handle_user_event(self, event):
        if event['action'] == 'signup':
            print("  → 환영 이메일 발송 준비")
        elif event['action'] == 'login':
            print("  → 로그인 통계 업데이트")

    def handle_order_event(self, event):
        if event['action'] == 'created':
            print("  → 주문 확인 이메일 발송")
        elif 'payment' in event['action']:
            print("  → 결제 처리 완료 알림")

    def start_consuming(self):
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            queue=f'{self.name}_queue',
            on_message_callback=self.callback
        )

        print(f' [*] {self.name} 서비스 시작: {self.patterns}')
        self.channel.start_consuming()

# 다양한 소비자 생성
if __name__ == "__main__":
    import sys

    consumers = {
        'user_service': ['user.*'],
        'order_service': ['order.*'],
        'notification_service': ['*.signup', '*.created', '*.payment.*'],
        'analytics_service': ['#']  # 모든 이벤트
    }

    service_name = sys.argv[1] if len(sys.argv) > 1 else 'user_service'
    patterns = consumers.get(service_name, ['user.*'])

    consumer = EventConsumer(service_name, patterns)
    consumer.start_consuming()

실행 예시

# 터미널 1: 알림 서비스 (회원가입, 주문생성, 결제 이벤트만)
python specialized_consumers.py notification_service

# 터미널 2: 분석 서비스 (모든 이벤트)
python specialized_consumers.py analytics_service

# 터미널 3: 이벤트 발송
python event_publisher.py

Fanout Exchange 패턴

Fanout Exchange는 라우팅 키를 무시하고 연결된 모든 큐에 메시지를 브로드캐스트합니다.

구현 예제: 실시간 알림 시스템

broadcast_publisher.py

import pika
import json
from datetime import datetime

def broadcast_notification(title, message, urgency='normal'):
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()

    # Fanout exchange 선언
    channel.exchange_declare(
        exchange='notifications',
        exchange_type='fanout',
        durable=True
    )

    notification = {
        'title': title,
        'message': message,
        'urgency': urgency,
        'timestamp': datetime.now().isoformat()
    }

    # 라우팅 키는 무시됨
    channel.basic_publish(
        exchange='notifications',
        routing_key='',  # Fanout에서는 무시됨
        body=json.dumps(notification, ensure_ascii=False),
        properties=pika.BasicProperties(delivery_mode=2)
    )

    print(f" [x] 알림 브로드캐스트: {title}")
    connection.close()

# 사용 예시
broadcast_notification(
    "시스템 점검 안내",
    "오늘 밤 12시부터 2시간 동안 시스템 점검이 있습니다.",
    "high"
)

notification_consumers.py

import pika
import json

class NotificationHandler:
    def __init__(self, handler_type):
        self.handler_type = handler_type
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()

        # Exchange 선언
        self.channel.exchange_declare(
            exchange='notifications',
            exchange_type='fanout',
            durable=True
        )

        # 각 핸들러별 전용 큐
        result = self.channel.queue_declare(
            queue=f'notifications_{handler_type}',
            durable=True
        )

        # Fanout이므로 라우팅 키 없이 바인딩
        self.channel.queue_bind(
            exchange='notifications',
            queue=f'notifications_{handler_type}'
        )

    def callback(self, ch, method, properties, body):
        notification = json.loads(body.decode())

        print(f" [{self.handler_type}] 알림 수신:")
        print(f"   제목: {notification['title']}")
        print(f"   내용: {notification['message']}")
        print(f"   긴급도: {notification['urgency']}")

        # 핸들러별 처리
        if self.handler_type == 'email':
            self.send_email(notification)
        elif self.handler_type == 'sms':
            self.send_sms(notification)
        elif self.handler_type == 'push':
            self.send_push_notification(notification)
        elif self.handler_type == 'database':
            self.save_to_database(notification)

        ch.basic_ack(delivery_tag=method.delivery_tag)

    def send_email(self, notification):
        print("   → 이메일 발송 처리")

    def send_sms(self, notification):
        if notification['urgency'] == 'high':
            print("   → SMS 발송 처리 (긴급)")

    def send_push_notification(self, notification):
        print("   → 푸시 알림 발송 처리")

    def save_to_database(self, notification):
        print("   → 데이터베이스 저장 처리")

    def start_consuming(self):
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            queue=f'notifications_{self.handler_type}',
            on_message_callback=self.callback
        )

        print(f' [*] {self.handler_type} 알림 핸들러 시작')
        self.channel.start_consuming()

# 사용 예시
if __name__ == "__main__":
    import sys

    handler_type = sys.argv[1] if len(sys.argv) > 1 else 'email'
    handler = NotificationHandler(handler_type)
    handler.start_consuming()

Headers Exchange 패턴

Headers Exchange는 라우팅 키 대신 메시지 헤더를 기반으로 라우팅합니다.

# Headers exchange 예제
channel.exchange_declare(
    exchange='headers_exchange',
    exchange_type='headers'
)

# 헤더 기반 바인딩
channel.queue_bind(
    exchange='headers_exchange',
    queue='pdf_queue',
    arguments={
        'x-match': 'all',      # 모든 헤더 매칭
        'format': 'pdf',
        'size': 'large'
    }
)

# 헤더와 함께 메시지 발송
channel.basic_publish(
    exchange='headers_exchange',
    routing_key='',  # 무시됨
    body='PDF 파일 처리 요청',
    properties=pika.BasicProperties(
        headers={
            'format': 'pdf',
            'size': 'large',
            'priority': 'high'
        }
    )
)

실전 메시징 패턴

1. Work Queue (작업 분산)

여러 소비자가 작업을 분산 처리하는 패턴입니다.

# 작업 분산 설정
channel.basic_qos(prefetch_count=1)  # 한 번에 하나씩만

# Round-robin 방식으로 작업 분산
for i in range(10):
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=f'작업 {i}',
        properties=pika.BasicProperties(delivery_mode=2)
    )

2. Publish/Subscribe

동일한 메시지를 여러 소비자가 받는 패턴입니다.

# Fanout exchange로 구현
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 모든 구독자에게 브로드캐스트
channel.basic_publish(
    exchange='logs',
    routing_key='',
    body='모든 구독자에게 전송'
)

요약

이번 편에서는 RabbitMQ의 핵심인 Exchange의 4가지 타입을 살펴봤습니다:

  • Direct: 정확한 키 매칭 (로그 레벨별 처리)
  • Topic: 패턴 매칭 (이벤트 기반 아키텍처)
  • Fanout: 브로드캐스트 (실시간 알림)
  • Headers: 헤더 기반 라우팅 (메타데이터 활용)

3편에서는 프로덕션 환경을 위한 고급 기능들을 다룹니다:

  • 클러스터링과 고가용성
  • 모니터링과 성능 튜닝
  • 보안 설정과 운영 모범 사례

실제 프로젝트에서는 여러 Exchange 타입을 조합하여 복잡한 메시징 아키텍처를 구성합니다. 각 패턴의 특성을 이해하고 상황에 맞는 선택을 하는 것이 중요합니다.