hobokai 님의 블로그

마이크로서비스 아키텍처 완벽 가이드 2편: API Gateway, 서비스 메시, 통신 패턴 본문

System Architecture

마이크로서비스 아키텍처 완벽 가이드 2편: API Gateway, 서비스 메시, 통신 패턴

hobokai 2025. 7. 23. 16:47

목차

  1. API Gateway 패턴
  2. 서비스 메시 아키텍처
  3. 서비스 간 통신 패턴
  4. 분산 트랜잭션 관리
  5. 실전 구현 사례

API Gateway 패턴

API Gateway는 모든 클라이언트 요청의 단일 진입점 역할을 하며, 여러 마이크로서비스로 요청을 라우팅하는 중간 계층입니다.

핵심 기능

                클라이언트 요청
                      ↓
    ┌─────────────────────────────────────┐
    │           API Gateway               │
    │ ┌─────────────────────────────────┐ │
    │ │ • 인증/인가 (Authentication)     │ │
    │ │ • 로드 밸런싱 (Load Balancing)   │ │
    │ │ • 레이트 리미팅 (Rate Limiting)  │ │
    │ │ • 로깅/모니터링 (Logging)       │ │
    │ │ • 응답 캐싱 (Response Caching)  │ │
    │ │ • API 버저닝 (API Versioning)   │ │
    │ └─────────────────────────────────┘ │
    └─────────────────────────────────────┘
                      ↓
        ┌──────────┬──────────┬──────────┐
        │   주문    │   재고    │   결제    │
        │  서비스   │  서비스   │  서비스   │
        └──────────┴──────────┴──────────┘

Kong API Gateway 구현

# docker-compose.yml
version: '3.8'
services:
  kong-database:
    image: postgres:13
    environment:
      POSTGRES_USER: kong
      POSTGRES_PASSWORD: kong
      POSTGRES_DB: kong
    volumes:
      - kong_data:/var/lib/postgresql/data

  kong-migration:
    image: kong:3.0
    command: kong migrations bootstrap
    depends_on:
      - kong-database
    environment:
      KONG_DATABASE: postgres
      KONG_PG_HOST: kong-database
      KONG_PG_PASSWORD: kong

  kong:
    image: kong:3.0
    depends_on:
      - kong-database
      - kong-migration
    ports:
      - "8000:8000"  # Proxy port
      - "8001:8001"  # Admin API port
    environment:
      KONG_DATABASE: postgres
      KONG_PG_HOST: kong-database
      KONG_PG_PASSWORD: kong
      KONG_PROXY_ACCESS_LOG: /dev/stdout
      KONG_ADMIN_ACCESS_LOG: /dev/stdout
      KONG_PROXY_ERROR_LOG: /dev/stderr
      KONG_ADMIN_ERROR_LOG: /dev/stderr
      KONG_ADMIN_LISTEN: 0.0.0.0:8001

  konga:
    image: pantsel/konga
    ports:
      - "1337:1337"
    environment:
      NODE_ENV: development

volumes:
  kong_data:

Kong 서비스 설정

# kong_config.py - Kong API Gateway 설정
import requests
import json

class KongConfigManager:
    def __init__(self, admin_url="http://localhost:8001"):
        self.admin_url = admin_url

    def create_service(self, service_name, service_url):
        """서비스 등록"""
        service_data = {
            "name": service_name,
            "url": service_url,
            "protocol": "http",
            "connect_timeout": 60000,
            "write_timeout": 60000,
            "read_timeout": 60000
        }

        response = requests.post(
            f"{self.admin_url}/services",
            json=service_data
        )
        return response.json()

    def create_route(self, service_name, route_paths, methods=["GET", "POST"]):
        """라우트 생성"""
        route_data = {
            "paths": route_paths,
            "methods": methods,
            "strip_path": False
        }

        response = requests.post(
            f"{self.admin_url}/services/{service_name}/routes",
            json=route_data
        )
        return response.json()

    def setup_jwt_auth(self, service_name):
        """JWT 인증 플러그인 설정"""
        plugin_data = {
            "name": "jwt",
            "config": {
                "uri_param_names": ["jwt"],
                "cookie_names": ["jwt"],
                "claims_to_verify": ["exp"],
                "key_claim_name": "iss",
                "algorithm": "HS256"
            }
        }

        response = requests.post(
            f"{self.admin_url}/services/{service_name}/plugins",
            json=plugin_data
        )
        return response.json()

    def setup_rate_limiting(self, service_name, requests_per_minute=100):
        """레이트 리미팅 설정"""
        plugin_data = {
            "name": "rate-limiting",
            "config": {
                "minute": requests_per_minute,
                "policy": "local",
                "fault_tolerant": True,
                "hide_client_headers": False
            }
        }

        response = requests.post(
            f"{self.admin_url}/services/{service_name}/plugins",
            json=plugin_data
        )
        return response.json()

# 설정 실행
kong_manager = KongConfigManager()

# 마이크로서비스들 등록
services = [
    ("order-service", "http://order-service:8080"),
    ("inventory-service", "http://inventory-service:8080"),
    ("payment-service", "http://payment-service:8080"),
    ("user-service", "http://user-service:8080")
]

for service_name, service_url in services:
    # 서비스 생성
    kong_manager.create_service(service_name, service_url)

    # 라우트 생성
    kong_manager.create_route(service_name, [f"/{service_name.replace('-service', '')}"])

    # JWT 인증 활성화
    kong_manager.setup_jwt_auth(service_name)

    # 레이트 리미팅 설정
    kong_manager.setup_rate_limiting(service_name)

Spring Cloud Gateway 구현

# application.yml
spring:
  cloud:
    gateway:
      routes:
        - id: order-service
          uri: http://order-service:8080
          predicates:
            - Path=/orders/**
          filters:
            - name: CircuitBreaker
              args:
                name: order-circuit-breaker
                fallbackUri: forward:/fallback/orders
            - name: Retry
              args:
                retries: 3
                statuses: BAD_GATEWAY,GATEWAY_TIMEOUT

        - id: inventory-service
          uri: http://inventory-service:8080
          predicates:
            - Path=/inventory/**
          filters:
            - name: RequestRateLimiter
              args:
                redis-rate-limiter.replenishRate: 10
                redis-rate-limiter.burstCapacity: 20
                key-resolver: "#{@userKeyResolver}"

        - id: payment-service
          uri: http://payment-service:8080
          predicates:
            - Path=/payments/**
          filters:
            - name: AddRequestHeader
              args:
                name: X-Request-Source
                value: API-Gateway
// GatewayConfig.java
@Configuration
@EnableWebFluxSecurity
public class GatewayConfig {

    @Bean
    public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
        return builder.routes()
            .route("order-service", r -> r
                .path("/orders/**")
                .filters(f -> f
                    .circuitBreaker(c -> c
                        .setName("order-circuit-breaker")
                        .setFallbackUri("forward:/fallback/orders"))
                    .retry(3))
                .uri("http://order-service:8080"))

            .route("inventory-service", r -> r
                .path("/inventory/**")
                .filters(f -> f
                    .requestRateLimiter(c -> c
                        .setRateLimiter(redisRateLimiter())
                        .setKeyResolver(userKeyResolver())))
                .uri("http://inventory-service:8080"))
            .build();
    }

    @Bean
    public RedisRateLimiter redisRateLimiter() {
        return new RedisRateLimiter(10, 20); // replenishRate, burstCapacity
    }

    @Bean
    public KeyResolver userKeyResolver() {
        return exchange -> exchange.getRequest()
            .getHeaders()
            .getFirst("X-User-ID")
            .map(Mono::just)
            .orElse(Mono.just("anonymous"));
    }

    @Bean
    @Order(Ordered.HIGHEST_PRECEDENCE)
    public CorsWebFilter corsFilter() {
        CorsConfiguration config = new CorsConfiguration();
        config.setAllowCredentials(true);
        config.addAllowedOriginPattern("*");
        config.addAllowedHeader("*");
        config.addAllowedMethod("*");

        UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
        source.registerCorsConfiguration("/**", config);

        return new CorsWebFilter(source);
    }
}

서비스 메시 아키텍처

서비스 메시는 마이크로서비스 간 통신을 위한 전용 인프라 계층으로, 네트워크 통신의 복잡성을 추상화합니다.

Istio 서비스 메시 구현

# istio-installation.yaml
apiVersion: install.istio.io/v1alpha1
kind: IstioOperator
metadata:
  name: control-plane
spec:
  values:
    global:
      meshID: mesh1
      multiCluster:
        clusterName: cluster1
      network: network1
  components:
    pilot:
      k8s:
        resources:
          requests:
            memory: 512Mi
            cpu: "0.5"
# Istio 설치
curl -L https://istio.io/downloadIstio | sh -
export PATH=$PWD/istio-1.18.0/bin:$PATH

# Istio 설치
istioctl install --set values.defaultRevision=default

# 네임스페이스에 사이드카 인젝션 활성화
kubectl label namespace default istio-injection=enabled

# Kiali, Jaeger, Grafana 설치
kubectl apply -f samples/addons/

서비스 메시 설정

# microservices-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: order-service
  labels:
    app: order-service
    version: v1
spec:
  replicas: 3
  selector:
    matchLabels:
      app: order-service
      version: v1
  template:
    metadata:
      labels:
        app: order-service
        version: v1
    spec:
      containers:
      - name: order-service
        image: order-service:latest
        ports:
        - containerPort: 8080
        env:
        - name: DATABASE_URL
          value: "postgresql://order-db:5432/orders"
---
apiVersion: v1
kind: Service
metadata:
  name: order-service
  labels:
    app: order-service
spec:
  ports:
  - port: 8080
    name: http
  selector:
    app: order-service
---
# 트래픽 라우팅 설정
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: order-service
spec:
  http:
  - match:
    - headers:
        x-canary:
          exact: "true"
    route:
    - destination:
        host: order-service
        subset: v2
      weight: 100
  - route:
    - destination:
        host: order-service
        subset: v1
      weight: 90
    - destination:
        host: order-service
        subset: v2
      weight: 10
---
# 목적지 규칙
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: order-service
spec:
  host: order-service
  subsets:
  - name: v1
    labels:
      version: v1
  - name: v2
    labels:
      version: v2
  trafficPolicy:
    connectionPool:
      tcp:
        maxConnections: 10
      http:
        http1MaxPendingRequests: 10
        maxRequestsPerConnection: 2
    circuitBreaker:
      consecutiveErrors: 3
      interval: 30s
      baseEjectionTime: 30s

서비스 메시 보안 설정

# security-policies.yaml
# PeerAuthentication - mTLS 설정
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
  name: default
  namespace: default
spec:
  mtls:
    mode: STRICT
---
# AuthorizationPolicy - 접근 제어
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: order-service-authz
  namespace: default
spec:
  selector:
    matchLabels:
      app: order-service
  rules:
  - from:
    - source:
        principals: ["cluster.local/ns/default/sa/payment-service"]
    - source:
        principals: ["cluster.local/ns/default/sa/user-service"]
    to:
    - operation:
        methods: ["POST", "GET"]
        paths: ["/orders/*"]
    when:
    - key: request.headers[authorization]
      values: ["Bearer *"]

서비스 간 통신 패턴

1. 동기 통신 (Synchronous Communication)

# 동기 HTTP 통신 구현
import requests
from functools import wraps
import time
import logging

class SynchronousServiceClient:
    def __init__(self, base_url, timeout=30, max_retries=3):
        self.base_url = base_url
        self.timeout = timeout
        self.max_retries = max_retries
        self.session = requests.Session()

    def with_circuit_breaker(self, func):
        """Circuit Breaker 패턴 적용"""
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(self.max_retries):
                try:
                    response = func(*args, **kwargs)
                    if response.status_code < 500:
                        return response

                    if attempt < self.max_retries - 1:
                        wait_time = (2 ** attempt) + random.uniform(0, 1)
                        time.sleep(wait_time)
                        continue

                    return response

                except requests.exceptions.RequestException as e:
                    if attempt < self.max_retries - 1:
                        logging.warning(f"Request failed, retrying... {e}")
                        time.sleep(2 ** attempt)
                        continue
                    raise e
        return wrapper

    @with_circuit_breaker
    def get_user(self, user_id):
        """사용자 정보 조회"""
        response = self.session.get(
            f"{self.base_url}/users/{user_id}",
            timeout=self.timeout
        )
        return response

    @with_circuit_breaker
    def create_order(self, order_data):
        """주문 생성"""
        response = self.session.post(
            f"{self.base_url}/orders",
            json=order_data,
            timeout=self.timeout
        )
        return response

# 사용 예시
class OrderService:
    def __init__(self):
        self.user_client = SynchronousServiceClient("http://user-service:8080")
        self.inventory_client = SynchronousServiceClient("http://inventory-service:8080")
        self.payment_client = SynchronousServiceClient("http://payment-service:8080")

    def process_order(self, order_request):
        try:
            # 1. 사용자 검증
            user_response = self.user_client.get_user(order_request["user_id"])
            if user_response.status_code != 200:
                raise ValueError("Invalid user")

            # 2. 재고 확인
            inventory_response = self.inventory_client.session.post(
                f"{self.inventory_client.base_url}/inventory/check",
                json={"items": order_request["items"]}
            )

            if not inventory_response.json()["available"]:
                raise ValueError("Insufficient inventory")

            # 3. 주문 생성
            order = self._create_order_record(order_request)

            # 4. 결제 처리
            payment_response = self.payment_client.session.post(
                f"{self.payment_client.base_url}/payments",
                json={
                    "order_id": order["id"],
                    "amount": order["total_amount"],
                    "payment_method": order_request["payment_method"]
                }
            )

            if payment_response.status_code != 201:
                self._rollback_order(order["id"])
                raise ValueError("Payment failed")

            return order

        except Exception as e:
            logging.error(f"Order processing failed: {e}")
            raise e

2. 비동기 통신 (Asynchronous Communication)

# 비동기 이벤트 기반 통신
import asyncio
import aioredis
import json
from kafka import KafkaProducer, KafkaConsumer
from concurrent.futures import ThreadPoolExecutor

class AsyncEventBus:
    def __init__(self):
        self.kafka_producer = KafkaProducer(
            bootstrap_servers=['kafka:9092'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        self.redis_client = None
        self.event_handlers = {}

    async def initialize(self):
        """Redis 연결 초기화"""
        self.redis_client = await aioredis.from_url("redis://redis:6379")

    async def publish_event(self, event_type, event_data):
        """이벤트 발행"""
        event = {
            "event_type": event_type,
            "event_data": event_data,
            "timestamp": time.time(),
            "event_id": str(uuid.uuid4())
        }

        # Kafka로 이벤트 발행
        self.kafka_producer.send(event_type, event)

        # Redis로 즉시 알림 (필요시)
        await self.redis_client.publish(f"event:{event_type}", json.dumps(event))

    def register_handler(self, event_type, handler_func):
        """이벤트 핸들러 등록"""
        if event_type not in self.event_handlers:
            self.event_handlers[event_type] = []
        self.event_handlers[event_type].append(handler_func)

    async def start_consumer(self):
        """이벤트 소비 시작"""
        consumer = KafkaConsumer(
            *self.event_handlers.keys(),
            bootstrap_servers=['kafka:9092'],
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            group_id='async-event-processor'
        )

        executor = ThreadPoolExecutor(max_workers=10)

        for message in consumer:
            event_type = message.topic
            event_data = message.value

            if event_type in self.event_handlers:
                for handler in self.event_handlers[event_type]:
                    # 비동기 처리
                    executor.submit(self._handle_event_async, handler, event_data)

    def _handle_event_async(self, handler, event_data):
        """이벤트 비동기 처리"""
        try:
            handler(event_data)
        except Exception as e:
            logging.error(f"Event handling failed: {e}")

# 비동기 주문 서비스
class AsyncOrderService:
    def __init__(self):
        self.event_bus = AsyncEventBus()
        self.setup_event_handlers()

    def setup_event_handlers(self):
        """이벤트 핸들러 설정"""
        self.event_bus.register_handler("order.created", self.handle_order_created)
        self.event_bus.register_handler("inventory.reserved", self.handle_inventory_reserved)
        self.event_bus.register_handler("payment.completed", self.handle_payment_completed)
        self.event_bus.register_handler("payment.failed", self.handle_payment_failed)

    async def create_order(self, order_request):
        """비동기 주문 생성"""
        # 1. 주문 레코드 생성 (즉시)
        order = await self._create_order_record(order_request)

        # 2. 주문 생성 이벤트 발행 (비동기 처리 시작)
        await self.event_bus.publish_event("order.created", {
            "order_id": order["id"],
            "user_id": order["user_id"],
            "items": order["items"],
            "total_amount": order["total_amount"]
        })

        return {"order_id": order["id"], "status": "processing"}

    def handle_order_created(self, event_data):
        """주문 생성 이벤트 처리"""
        # 재고 예약 요청 이벤트 발행
        asyncio.create_task(
            self.event_bus.publish_event("inventory.reserve", {
                "order_id": event_data["order_id"],
                "items": event_data["items"]
            })
        )

    def handle_inventory_reserved(self, event_data):
        """재고 예약 완료 이벤트 처리"""
        if event_data["success"]:
            # 결제 처리 요청 이벤트 발행
            asyncio.create_task(
                self.event_bus.publish_event("payment.process", {
                    "order_id": event_data["order_id"],
                    "amount": event_data["total_amount"]
                })
            )
        else:
            # 주문 취소
            asyncio.create_task(
                self.event_bus.publish_event("order.cancelled", {
                    "order_id": event_data["order_id"],
                    "reason": "insufficient_inventory"
                })
            )

    def handle_payment_completed(self, event_data):
        """결제 완료 이벤트 처리"""
        # 주문 상태 업데이트
        self._update_order_status(event_data["order_id"], "completed")

        # 알림 발송 이벤트 발행
        asyncio.create_task(
            self.event_bus.publish_event("notification.send", {
                "order_id": event_data["order_id"],
                "type": "order_completed"
            })
        )

    def handle_payment_failed(self, event_data):
        """결제 실패 이벤트 처리"""
        # 재고 해제 요청
        asyncio.create_task(
            self.event_bus.publish_event("inventory.release", {
                "order_id": event_data["order_id"]
            })
        )

        # 주문 취소
        self._update_order_status(event_data["order_id"], "cancelled")

분산 트랜잭션 관리

Saga 패턴 구현

# Saga 패턴 - 분산 트랜잭션 관리
from enum import Enum
from dataclasses import dataclass
from typing import List, Callable, Any
import uuid

class SagaStepStatus(Enum):
    PENDING = "pending"
    COMPLETED = "completed"
    FAILED = "failed"
    COMPENSATED = "compensated"

@dataclass
class SagaStep:
    step_id: str
    service_name: str
    action: Callable
    compensation: Callable
    status: SagaStepStatus = SagaStepStatus.PENDING
    result: Any = None
    error: str = None

class SagaOrchestrator:
    def __init__(self):
        self.sagas = {}
        self.event_bus = AsyncEventBus()

    def create_saga(self, saga_id: str, steps: List[SagaStep]):
        """Saga 생성"""
        self.sagas[saga_id] = {
            "steps": steps,
            "current_step": 0,
            "status": "running",
            "created_at": time.time()
        }
        return saga_id

    async def execute_saga(self, saga_id: str):
        """Saga 실행"""
        saga = self.sagas[saga_id]
        steps = saga["steps"]

        try:
            # 순차적으로 각 단계 실행
            for i, step in enumerate(steps):
                saga["current_step"] = i

                try:
                    # 단계 실행
                    result = await step.action()
                    step.status = SagaStepStatus.COMPLETED
                    step.result = result

                    print(f"Saga {saga_id} - Step {step.step_id} completed")

                except Exception as e:
                    step.status = SagaStepStatus.FAILED
                    step.error = str(e)

                    print(f"Saga {saga_id} - Step {step.step_id} failed: {e}")

                    # 실패 시 보상 트랜잭션 실행
                    await self._execute_compensations(saga_id, i)
                    saga["status"] = "failed"
                    return False

            saga["status"] = "completed"
            return True

        except Exception as e:
            saga["status"] = "error"
            print(f"Saga {saga_id} execution error: {e}")
            return False

    async def _execute_compensations(self, saga_id: str, failed_step_index: int):
        """보상 트랜잭션 실행"""
        saga = self.sagas[saga_id]
        steps = saga["steps"]

        # 실패한 단계 이전의 완료된 단계들을 역순으로 보상
        for i in range(failed_step_index - 1, -1, -1):
            step = steps[i]

            if step.status == SagaStepStatus.COMPLETED:
                try:
                    await step.compensation()
                    step.status = SagaStepStatus.COMPENSATED
                    print(f"Saga {saga_id} - Step {step.step_id} compensated")

                except Exception as e:
                    print(f"Saga {saga_id} - Compensation failed for step {step.step_id}: {e}")

# 주문 처리 Saga 구현
class OrderProcessingSaga:
    def __init__(self):
        self.orchestrator = SagaOrchestrator()
        self.inventory_service = InventoryServiceClient()
        self.payment_service = PaymentServiceClient()
        self.order_service = OrderServiceClient()
        self.notification_service = NotificationServiceClient()

    async def process_order(self, order_data):
        """주문 처리 Saga 실행"""
        saga_id = f"order_saga_{uuid.uuid4()}"

        # Saga 단계 정의
        steps = [
            SagaStep(
                step_id="create_order",
                service_name="order-service",
                action=lambda: self.order_service.create_order(order_data),
                compensation=lambda: self.order_service.cancel_order(order_data["order_id"])
            ),
            SagaStep(
                step_id="reserve_inventory",
                service_name="inventory-service",
                action=lambda: self.inventory_service.reserve_items(order_data["items"]),
                compensation=lambda: self.inventory_service.release_items(order_data["items"])
            ),
            SagaStep(
                step_id="process_payment",
                service_name="payment-service",
                action=lambda: self.payment_service.charge_payment(
                    order_data["payment_method"],
                    order_data["total_amount"]
                ),
                compensation=lambda: self.payment_service.refund_payment(order_data["payment_id"])
            ),
            SagaStep(
                step_id="send_confirmation",
                service_name="notification-service",
                action=lambda: self.notification_service.send_order_confirmation(
                    order_data["user_id"],
                    order_data["order_id"]
                ),
                compensation=lambda: self.notification_service.send_cancellation_notice(
                    order_data["user_id"],
                    order_data["order_id"]
                )
            )
        ]

        # Saga 생성 및 실행
        self.orchestrator.create_saga(saga_id, steps)
        success = await self.orchestrator.execute_saga(saga_id)

        return {
            "saga_id": saga_id,
            "success": success,
            "order_id": order_data["order_id"]
        }

# 사용 예시
async def main():
    saga_processor = OrderProcessingSaga()

    order_data = {
        "order_id": "ORD-123456",
        "user_id": "USER-789",
        "items": [
            {"product_id": "PROD-001", "quantity": 2},
            {"product_id": "PROD-002", "quantity": 1}
        ],
        "total_amount": 150.00,
        "payment_method": "credit_card"
    }

    result = await saga_processor.process_order(order_data)
    print(f"Order processing result: {result}")

# asyncio.run(main())

실전 구현 사례

완전한 마이크로서비스 통신 예제

# main.py - 마이크로서비스 통합 예제
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Optional
import httpx
import asyncio
import uuid

# 데이터 모델
class OrderItem(BaseModel):
    product_id: str
    quantity: int
    price: float

class CreateOrderRequest(BaseModel):
    user_id: str
    items: List[OrderItem]
    payment_method: str

class OrderResponse(BaseModel):
    order_id: str
    status: str
    total_amount: float
    estimated_delivery: Optional[str] = None

# API Gateway 구현
class APIGateway:
    def __init__(self):
        self.app = FastAPI(title="E-commerce API Gateway")
        self.service_registry = {
            "user-service": "http://user-service:8080",
            "order-service": "http://order-service:8080",
            "inventory-service": "http://inventory-service:8080",
            "payment-service": "http://payment-service:8080",
            "notification-service": "http://notification-service:8080"
        }
        self.setup_routes()

    def setup_routes(self):
        @self.app.post("/orders", response_model=OrderResponse)
        async def create_order(
            order_request: CreateOrderRequest,
            background_tasks: BackgroundTasks
        ):
            return await self.orchestrate_order_creation(order_request, background_tasks)

        @self.app.get("/orders/{order_id}")
        async def get_order(order_id: str):
            return await self.proxy_request("order-service", f"/orders/{order_id}")

        @self.app.get("/inventory/{product_id}")
        async def get_inventory(product_id: str):
            return await self.proxy_request("inventory-service", f"/inventory/{product_id}")

    async def orchestrate_order_creation(self, order_request: CreateOrderRequest, background_tasks: BackgroundTasks):
        """주문 생성 오케스트레이션"""
        try:
            # 1. 사용자 검증 (동기)
            user_valid = await self.validate_user(order_request.user_id)
            if not user_valid:
                raise HTTPException(status_code=400, detail="Invalid user")

            # 2. 재고 확인 (동기)
            inventory_available = await self.check_inventory(order_request.items)
            if not inventory_available:
                raise HTTPException(status_code=400, detail="Insufficient inventory")

            # 3. 주문 생성 (동기)
            order_id = str(uuid.uuid4())
            total_amount = sum(item.price * item.quantity for item in order_request.items)

            order_response = await self.create_order_record({
                "order_id": order_id,
                "user_id": order_request.user_id,
                "items": [item.dict() for item in order_request.items],
                "total_amount": total_amount,
                "status": "processing"
            })

            # 4. 비동기 처리 시작 (백그라운드)
            background_tasks.add_task(
                self.process_order_async,
                order_id,
                order_request.dict(),
                total_amount
            )

            return OrderResponse(
                order_id=order_id,
                status="processing",
                total_amount=total_amount,
                estimated_delivery="3-5 business days"
            )

        except Exception as e:
            raise HTTPException(status_code=500, detail=str(e))

    async def process_order_async(self, order_id: str, order_data: dict, total_amount: float):
        """비동기 주문 처리"""
        try:
            # 1. 재고 예약
            reserve_result = await self.reserve_inventory(order_data["items"])
            if not reserve_result["success"]:
                await self.cancel_order(order_id, "inventory_failed")
                return

            # 2. 결제 처리
            payment_result = await self.process_payment({
                "order_id": order_id,
                "amount": total_amount,
                "payment_method": order_data["payment_method"]
            })

            if not payment_result["success"]:
                # 재고 해제
                await self.release_inventory(order_data["items"])
                await self.cancel_order(order_id, "payment_failed")
                return

            # 3. 주문 완료 처리
            await self.complete_order(order_id)

            # 4. 알림 발송
            await self.send_notifications(order_id, order_data["user_id"])

        except Exception as e:
            print(f"Async order processing failed: {e}")
            await self.cancel_order(order_id, "processing_error")

    async def proxy_request(self, service_name: str, path: str, method: str = "GET", data: dict = None):
        """서비스 프록시 요청"""
        service_url = self.service_registry[service_name]

        async with httpx.AsyncClient() as client:
            if method == "GET":
                response = await client.get(f"{service_url}{path}")
            elif method == "POST":
                response = await client.post(f"{service_url}{path}", json=data)
            elif method == "PUT":
                response = await client.put(f"{service_url}{path}", json=data)
            elif method == "DELETE":
                response = await client.delete(f"{service_url}{path}")

            return response.json()

    async def validate_user(self, user_id: str) -> bool:
        """사용자 검증"""
        try:
            result = await self.proxy_request("user-service", f"/users/{user_id}")
            return result.get("active", False)
        except:
            return False

    async def check_inventory(self, items: List[OrderItem]) -> bool:
        """재고 확인"""
        try:
            result = await self.proxy_request(
                "inventory-service",
                "/inventory/check",
                "POST",
                {"items": [item.dict() for item in items]}
            )
            return result.get("available", False)
        except:
            return False

    async def create_order_record(self, order_data: dict):
        """주문 레코드 생성"""
        return await self.proxy_request("order-service", "/orders", "POST", order_data)

    async def reserve_inventory(self, items: List[dict]):
        """재고 예약"""
        return await self.proxy_request(
            "inventory-service",
            "/inventory/reserve",
            "POST",
            {"items": items}
        )

    async def process_payment(self, payment_data: dict):
        """결제 처리"""
        return await self.proxy_request("payment-service", "/payments", "POST", payment_data)

    async def complete_order(self, order_id: str):
        """주문 완료 처리"""
        return await self.proxy_request(
            "order-service",
            f"/orders/{order_id}/complete",
            "PUT"
        )

    async def send_notifications(self, order_id: str, user_id: str):
        """알림 발송"""
        return await self.proxy_request(
            "notification-service",
            "/notifications",
            "POST",
            {
                "user_id": user_id,
                "order_id": order_id,
                "type": "order_completed"
            }
        )

# 애플리케이션 실행
if __name__ == "__main__":
    import uvicorn

    gateway = APIGateway()
    uvicorn.run(gateway.app, host="0.0.0.0", port=8000)

2편에서는 API Gateway, 서비스 메시, 통신 패턴 등 마이크로서비스의 핵심 인프라 구성 요소들을 다뤘습니다.

3편에서는 데이터 관리, 모니터링, 배포 전략 등 운영 관점의 고급 주제들을 살펴보겠습니다:

  • 데이터베이스 패턴 (Database per Service, CQRS)
  • 분산 모니터링 및 로깅
  • CI/CD 파이프라인과 배포 전략
  • 성능 최적화 및 확장 전략

마이크로서비스는 단순한 기술이 아닌 조직 전체의 디지털 트랜스포메이션입니다. 점진적 도입과 지속적 개선이 성공의 열쇠입니다.