Notice
Recent Posts
Recent Comments
Link
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | |
| 7 | 8 | 9 | 10 | 11 | 12 | 13 |
| 14 | 15 | 16 | 17 | 18 | 19 | 20 |
| 21 | 22 | 23 | 24 | 25 | 26 | 27 |
| 28 | 29 | 30 | 31 |
Tags
- CI/CD
- infrastructureascode
- RabbitMQ Exchange
- 이벤트 스트리밍
- rabbitmq
- kubernetes
- 보안
- 메시지 브로커
- ApacheBench
- 서비스 설계
- 모노리스 분해
- 컨테이너오케스트레이션
- 고가용성
- 마이크로서비스 운영
- 서비스 메시
- 클라우드
- Python
- docker
- Kafka 클러스터
- 클러스터
- 모니터링
- 세션저장소
- 마이크로서비스 통신
- 프로덕션 운영
- devops
- 마이크로서비스
- 분산 시스템
- 분산 모니터링
- 인메모리데이터베이스
- 메시징 패턴
Archives
- Today
- Total
hobokai 님의 블로그
마이크로서비스 아키텍처 완벽 가이드 2편: API Gateway, 서비스 메시, 통신 패턴 본문
목차
API Gateway 패턴
API Gateway는 모든 클라이언트 요청의 단일 진입점 역할을 하며, 여러 마이크로서비스로 요청을 라우팅하는 중간 계층입니다.
핵심 기능
클라이언트 요청
↓
┌─────────────────────────────────────┐
│ API Gateway │
│ ┌─────────────────────────────────┐ │
│ │ • 인증/인가 (Authentication) │ │
│ │ • 로드 밸런싱 (Load Balancing) │ │
│ │ • 레이트 리미팅 (Rate Limiting) │ │
│ │ • 로깅/모니터링 (Logging) │ │
│ │ • 응답 캐싱 (Response Caching) │ │
│ │ • API 버저닝 (API Versioning) │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────┘
↓
┌──────────┬──────────┬──────────┐
│ 주문 │ 재고 │ 결제 │
│ 서비스 │ 서비스 │ 서비스 │
└──────────┴──────────┴──────────┘Kong API Gateway 구현
# docker-compose.yml
version: '3.8'
services:
kong-database:
image: postgres:13
environment:
POSTGRES_USER: kong
POSTGRES_PASSWORD: kong
POSTGRES_DB: kong
volumes:
- kong_data:/var/lib/postgresql/data
kong-migration:
image: kong:3.0
command: kong migrations bootstrap
depends_on:
- kong-database
environment:
KONG_DATABASE: postgres
KONG_PG_HOST: kong-database
KONG_PG_PASSWORD: kong
kong:
image: kong:3.0
depends_on:
- kong-database
- kong-migration
ports:
- "8000:8000" # Proxy port
- "8001:8001" # Admin API port
environment:
KONG_DATABASE: postgres
KONG_PG_HOST: kong-database
KONG_PG_PASSWORD: kong
KONG_PROXY_ACCESS_LOG: /dev/stdout
KONG_ADMIN_ACCESS_LOG: /dev/stdout
KONG_PROXY_ERROR_LOG: /dev/stderr
KONG_ADMIN_ERROR_LOG: /dev/stderr
KONG_ADMIN_LISTEN: 0.0.0.0:8001
konga:
image: pantsel/konga
ports:
- "1337:1337"
environment:
NODE_ENV: development
volumes:
kong_data:
Kong 서비스 설정
# kong_config.py - Kong API Gateway 설정
import requests
import json
class KongConfigManager:
def __init__(self, admin_url="http://localhost:8001"):
self.admin_url = admin_url
def create_service(self, service_name, service_url):
"""서비스 등록"""
service_data = {
"name": service_name,
"url": service_url,
"protocol": "http",
"connect_timeout": 60000,
"write_timeout": 60000,
"read_timeout": 60000
}
response = requests.post(
f"{self.admin_url}/services",
json=service_data
)
return response.json()
def create_route(self, service_name, route_paths, methods=["GET", "POST"]):
"""라우트 생성"""
route_data = {
"paths": route_paths,
"methods": methods,
"strip_path": False
}
response = requests.post(
f"{self.admin_url}/services/{service_name}/routes",
json=route_data
)
return response.json()
def setup_jwt_auth(self, service_name):
"""JWT 인증 플러그인 설정"""
plugin_data = {
"name": "jwt",
"config": {
"uri_param_names": ["jwt"],
"cookie_names": ["jwt"],
"claims_to_verify": ["exp"],
"key_claim_name": "iss",
"algorithm": "HS256"
}
}
response = requests.post(
f"{self.admin_url}/services/{service_name}/plugins",
json=plugin_data
)
return response.json()
def setup_rate_limiting(self, service_name, requests_per_minute=100):
"""레이트 리미팅 설정"""
plugin_data = {
"name": "rate-limiting",
"config": {
"minute": requests_per_minute,
"policy": "local",
"fault_tolerant": True,
"hide_client_headers": False
}
}
response = requests.post(
f"{self.admin_url}/services/{service_name}/plugins",
json=plugin_data
)
return response.json()
# 설정 실행
kong_manager = KongConfigManager()
# 마이크로서비스들 등록
services = [
("order-service", "http://order-service:8080"),
("inventory-service", "http://inventory-service:8080"),
("payment-service", "http://payment-service:8080"),
("user-service", "http://user-service:8080")
]
for service_name, service_url in services:
# 서비스 생성
kong_manager.create_service(service_name, service_url)
# 라우트 생성
kong_manager.create_route(service_name, [f"/{service_name.replace('-service', '')}"])
# JWT 인증 활성화
kong_manager.setup_jwt_auth(service_name)
# 레이트 리미팅 설정
kong_manager.setup_rate_limiting(service_name)
Spring Cloud Gateway 구현
# application.yml
spring:
cloud:
gateway:
routes:
- id: order-service
uri: http://order-service:8080
predicates:
- Path=/orders/**
filters:
- name: CircuitBreaker
args:
name: order-circuit-breaker
fallbackUri: forward:/fallback/orders
- name: Retry
args:
retries: 3
statuses: BAD_GATEWAY,GATEWAY_TIMEOUT
- id: inventory-service
uri: http://inventory-service:8080
predicates:
- Path=/inventory/**
filters:
- name: RequestRateLimiter
args:
redis-rate-limiter.replenishRate: 10
redis-rate-limiter.burstCapacity: 20
key-resolver: "#{@userKeyResolver}"
- id: payment-service
uri: http://payment-service:8080
predicates:
- Path=/payments/**
filters:
- name: AddRequestHeader
args:
name: X-Request-Source
value: API-Gateway
// GatewayConfig.java
@Configuration
@EnableWebFluxSecurity
public class GatewayConfig {
@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
return builder.routes()
.route("order-service", r -> r
.path("/orders/**")
.filters(f -> f
.circuitBreaker(c -> c
.setName("order-circuit-breaker")
.setFallbackUri("forward:/fallback/orders"))
.retry(3))
.uri("http://order-service:8080"))
.route("inventory-service", r -> r
.path("/inventory/**")
.filters(f -> f
.requestRateLimiter(c -> c
.setRateLimiter(redisRateLimiter())
.setKeyResolver(userKeyResolver())))
.uri("http://inventory-service:8080"))
.build();
}
@Bean
public RedisRateLimiter redisRateLimiter() {
return new RedisRateLimiter(10, 20); // replenishRate, burstCapacity
}
@Bean
public KeyResolver userKeyResolver() {
return exchange -> exchange.getRequest()
.getHeaders()
.getFirst("X-User-ID")
.map(Mono::just)
.orElse(Mono.just("anonymous"));
}
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public CorsWebFilter corsFilter() {
CorsConfiguration config = new CorsConfiguration();
config.setAllowCredentials(true);
config.addAllowedOriginPattern("*");
config.addAllowedHeader("*");
config.addAllowedMethod("*");
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
source.registerCorsConfiguration("/**", config);
return new CorsWebFilter(source);
}
}
서비스 메시 아키텍처
서비스 메시는 마이크로서비스 간 통신을 위한 전용 인프라 계층으로, 네트워크 통신의 복잡성을 추상화합니다.
Istio 서비스 메시 구현
# istio-installation.yaml
apiVersion: install.istio.io/v1alpha1
kind: IstioOperator
metadata:
name: control-plane
spec:
values:
global:
meshID: mesh1
multiCluster:
clusterName: cluster1
network: network1
components:
pilot:
k8s:
resources:
requests:
memory: 512Mi
cpu: "0.5"
# Istio 설치
curl -L https://istio.io/downloadIstio | sh -
export PATH=$PWD/istio-1.18.0/bin:$PATH
# Istio 설치
istioctl install --set values.defaultRevision=default
# 네임스페이스에 사이드카 인젝션 활성화
kubectl label namespace default istio-injection=enabled
# Kiali, Jaeger, Grafana 설치
kubectl apply -f samples/addons/
서비스 메시 설정
# microservices-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-service
labels:
app: order-service
version: v1
spec:
replicas: 3
selector:
matchLabels:
app: order-service
version: v1
template:
metadata:
labels:
app: order-service
version: v1
spec:
containers:
- name: order-service
image: order-service:latest
ports:
- containerPort: 8080
env:
- name: DATABASE_URL
value: "postgresql://order-db:5432/orders"
---
apiVersion: v1
kind: Service
metadata:
name: order-service
labels:
app: order-service
spec:
ports:
- port: 8080
name: http
selector:
app: order-service
---
# 트래픽 라우팅 설정
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: order-service
spec:
http:
- match:
- headers:
x-canary:
exact: "true"
route:
- destination:
host: order-service
subset: v2
weight: 100
- route:
- destination:
host: order-service
subset: v1
weight: 90
- destination:
host: order-service
subset: v2
weight: 10
---
# 목적지 규칙
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: order-service
spec:
host: order-service
subsets:
- name: v1
labels:
version: v1
- name: v2
labels:
version: v2
trafficPolicy:
connectionPool:
tcp:
maxConnections: 10
http:
http1MaxPendingRequests: 10
maxRequestsPerConnection: 2
circuitBreaker:
consecutiveErrors: 3
interval: 30s
baseEjectionTime: 30s
서비스 메시 보안 설정
# security-policies.yaml
# PeerAuthentication - mTLS 설정
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
name: default
namespace: default
spec:
mtls:
mode: STRICT
---
# AuthorizationPolicy - 접근 제어
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: order-service-authz
namespace: default
spec:
selector:
matchLabels:
app: order-service
rules:
- from:
- source:
principals: ["cluster.local/ns/default/sa/payment-service"]
- source:
principals: ["cluster.local/ns/default/sa/user-service"]
to:
- operation:
methods: ["POST", "GET"]
paths: ["/orders/*"]
when:
- key: request.headers[authorization]
values: ["Bearer *"]
서비스 간 통신 패턴
1. 동기 통신 (Synchronous Communication)
# 동기 HTTP 통신 구현
import requests
from functools import wraps
import time
import logging
class SynchronousServiceClient:
def __init__(self, base_url, timeout=30, max_retries=3):
self.base_url = base_url
self.timeout = timeout
self.max_retries = max_retries
self.session = requests.Session()
def with_circuit_breaker(self, func):
"""Circuit Breaker 패턴 적용"""
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(self.max_retries):
try:
response = func(*args, **kwargs)
if response.status_code < 500:
return response
if attempt < self.max_retries - 1:
wait_time = (2 ** attempt) + random.uniform(0, 1)
time.sleep(wait_time)
continue
return response
except requests.exceptions.RequestException as e:
if attempt < self.max_retries - 1:
logging.warning(f"Request failed, retrying... {e}")
time.sleep(2 ** attempt)
continue
raise e
return wrapper
@with_circuit_breaker
def get_user(self, user_id):
"""사용자 정보 조회"""
response = self.session.get(
f"{self.base_url}/users/{user_id}",
timeout=self.timeout
)
return response
@with_circuit_breaker
def create_order(self, order_data):
"""주문 생성"""
response = self.session.post(
f"{self.base_url}/orders",
json=order_data,
timeout=self.timeout
)
return response
# 사용 예시
class OrderService:
def __init__(self):
self.user_client = SynchronousServiceClient("http://user-service:8080")
self.inventory_client = SynchronousServiceClient("http://inventory-service:8080")
self.payment_client = SynchronousServiceClient("http://payment-service:8080")
def process_order(self, order_request):
try:
# 1. 사용자 검증
user_response = self.user_client.get_user(order_request["user_id"])
if user_response.status_code != 200:
raise ValueError("Invalid user")
# 2. 재고 확인
inventory_response = self.inventory_client.session.post(
f"{self.inventory_client.base_url}/inventory/check",
json={"items": order_request["items"]}
)
if not inventory_response.json()["available"]:
raise ValueError("Insufficient inventory")
# 3. 주문 생성
order = self._create_order_record(order_request)
# 4. 결제 처리
payment_response = self.payment_client.session.post(
f"{self.payment_client.base_url}/payments",
json={
"order_id": order["id"],
"amount": order["total_amount"],
"payment_method": order_request["payment_method"]
}
)
if payment_response.status_code != 201:
self._rollback_order(order["id"])
raise ValueError("Payment failed")
return order
except Exception as e:
logging.error(f"Order processing failed: {e}")
raise e
2. 비동기 통신 (Asynchronous Communication)
# 비동기 이벤트 기반 통신
import asyncio
import aioredis
import json
from kafka import KafkaProducer, KafkaConsumer
from concurrent.futures import ThreadPoolExecutor
class AsyncEventBus:
def __init__(self):
self.kafka_producer = KafkaProducer(
bootstrap_servers=['kafka:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
self.redis_client = None
self.event_handlers = {}
async def initialize(self):
"""Redis 연결 초기화"""
self.redis_client = await aioredis.from_url("redis://redis:6379")
async def publish_event(self, event_type, event_data):
"""이벤트 발행"""
event = {
"event_type": event_type,
"event_data": event_data,
"timestamp": time.time(),
"event_id": str(uuid.uuid4())
}
# Kafka로 이벤트 발행
self.kafka_producer.send(event_type, event)
# Redis로 즉시 알림 (필요시)
await self.redis_client.publish(f"event:{event_type}", json.dumps(event))
def register_handler(self, event_type, handler_func):
"""이벤트 핸들러 등록"""
if event_type not in self.event_handlers:
self.event_handlers[event_type] = []
self.event_handlers[event_type].append(handler_func)
async def start_consumer(self):
"""이벤트 소비 시작"""
consumer = KafkaConsumer(
*self.event_handlers.keys(),
bootstrap_servers=['kafka:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='async-event-processor'
)
executor = ThreadPoolExecutor(max_workers=10)
for message in consumer:
event_type = message.topic
event_data = message.value
if event_type in self.event_handlers:
for handler in self.event_handlers[event_type]:
# 비동기 처리
executor.submit(self._handle_event_async, handler, event_data)
def _handle_event_async(self, handler, event_data):
"""이벤트 비동기 처리"""
try:
handler(event_data)
except Exception as e:
logging.error(f"Event handling failed: {e}")
# 비동기 주문 서비스
class AsyncOrderService:
def __init__(self):
self.event_bus = AsyncEventBus()
self.setup_event_handlers()
def setup_event_handlers(self):
"""이벤트 핸들러 설정"""
self.event_bus.register_handler("order.created", self.handle_order_created)
self.event_bus.register_handler("inventory.reserved", self.handle_inventory_reserved)
self.event_bus.register_handler("payment.completed", self.handle_payment_completed)
self.event_bus.register_handler("payment.failed", self.handle_payment_failed)
async def create_order(self, order_request):
"""비동기 주문 생성"""
# 1. 주문 레코드 생성 (즉시)
order = await self._create_order_record(order_request)
# 2. 주문 생성 이벤트 발행 (비동기 처리 시작)
await self.event_bus.publish_event("order.created", {
"order_id": order["id"],
"user_id": order["user_id"],
"items": order["items"],
"total_amount": order["total_amount"]
})
return {"order_id": order["id"], "status": "processing"}
def handle_order_created(self, event_data):
"""주문 생성 이벤트 처리"""
# 재고 예약 요청 이벤트 발행
asyncio.create_task(
self.event_bus.publish_event("inventory.reserve", {
"order_id": event_data["order_id"],
"items": event_data["items"]
})
)
def handle_inventory_reserved(self, event_data):
"""재고 예약 완료 이벤트 처리"""
if event_data["success"]:
# 결제 처리 요청 이벤트 발행
asyncio.create_task(
self.event_bus.publish_event("payment.process", {
"order_id": event_data["order_id"],
"amount": event_data["total_amount"]
})
)
else:
# 주문 취소
asyncio.create_task(
self.event_bus.publish_event("order.cancelled", {
"order_id": event_data["order_id"],
"reason": "insufficient_inventory"
})
)
def handle_payment_completed(self, event_data):
"""결제 완료 이벤트 처리"""
# 주문 상태 업데이트
self._update_order_status(event_data["order_id"], "completed")
# 알림 발송 이벤트 발행
asyncio.create_task(
self.event_bus.publish_event("notification.send", {
"order_id": event_data["order_id"],
"type": "order_completed"
})
)
def handle_payment_failed(self, event_data):
"""결제 실패 이벤트 처리"""
# 재고 해제 요청
asyncio.create_task(
self.event_bus.publish_event("inventory.release", {
"order_id": event_data["order_id"]
})
)
# 주문 취소
self._update_order_status(event_data["order_id"], "cancelled")
분산 트랜잭션 관리
Saga 패턴 구현
# Saga 패턴 - 분산 트랜잭션 관리
from enum import Enum
from dataclasses import dataclass
from typing import List, Callable, Any
import uuid
class SagaStepStatus(Enum):
PENDING = "pending"
COMPLETED = "completed"
FAILED = "failed"
COMPENSATED = "compensated"
@dataclass
class SagaStep:
step_id: str
service_name: str
action: Callable
compensation: Callable
status: SagaStepStatus = SagaStepStatus.PENDING
result: Any = None
error: str = None
class SagaOrchestrator:
def __init__(self):
self.sagas = {}
self.event_bus = AsyncEventBus()
def create_saga(self, saga_id: str, steps: List[SagaStep]):
"""Saga 생성"""
self.sagas[saga_id] = {
"steps": steps,
"current_step": 0,
"status": "running",
"created_at": time.time()
}
return saga_id
async def execute_saga(self, saga_id: str):
"""Saga 실행"""
saga = self.sagas[saga_id]
steps = saga["steps"]
try:
# 순차적으로 각 단계 실행
for i, step in enumerate(steps):
saga["current_step"] = i
try:
# 단계 실행
result = await step.action()
step.status = SagaStepStatus.COMPLETED
step.result = result
print(f"Saga {saga_id} - Step {step.step_id} completed")
except Exception as e:
step.status = SagaStepStatus.FAILED
step.error = str(e)
print(f"Saga {saga_id} - Step {step.step_id} failed: {e}")
# 실패 시 보상 트랜잭션 실행
await self._execute_compensations(saga_id, i)
saga["status"] = "failed"
return False
saga["status"] = "completed"
return True
except Exception as e:
saga["status"] = "error"
print(f"Saga {saga_id} execution error: {e}")
return False
async def _execute_compensations(self, saga_id: str, failed_step_index: int):
"""보상 트랜잭션 실행"""
saga = self.sagas[saga_id]
steps = saga["steps"]
# 실패한 단계 이전의 완료된 단계들을 역순으로 보상
for i in range(failed_step_index - 1, -1, -1):
step = steps[i]
if step.status == SagaStepStatus.COMPLETED:
try:
await step.compensation()
step.status = SagaStepStatus.COMPENSATED
print(f"Saga {saga_id} - Step {step.step_id} compensated")
except Exception as e:
print(f"Saga {saga_id} - Compensation failed for step {step.step_id}: {e}")
# 주문 처리 Saga 구현
class OrderProcessingSaga:
def __init__(self):
self.orchestrator = SagaOrchestrator()
self.inventory_service = InventoryServiceClient()
self.payment_service = PaymentServiceClient()
self.order_service = OrderServiceClient()
self.notification_service = NotificationServiceClient()
async def process_order(self, order_data):
"""주문 처리 Saga 실행"""
saga_id = f"order_saga_{uuid.uuid4()}"
# Saga 단계 정의
steps = [
SagaStep(
step_id="create_order",
service_name="order-service",
action=lambda: self.order_service.create_order(order_data),
compensation=lambda: self.order_service.cancel_order(order_data["order_id"])
),
SagaStep(
step_id="reserve_inventory",
service_name="inventory-service",
action=lambda: self.inventory_service.reserve_items(order_data["items"]),
compensation=lambda: self.inventory_service.release_items(order_data["items"])
),
SagaStep(
step_id="process_payment",
service_name="payment-service",
action=lambda: self.payment_service.charge_payment(
order_data["payment_method"],
order_data["total_amount"]
),
compensation=lambda: self.payment_service.refund_payment(order_data["payment_id"])
),
SagaStep(
step_id="send_confirmation",
service_name="notification-service",
action=lambda: self.notification_service.send_order_confirmation(
order_data["user_id"],
order_data["order_id"]
),
compensation=lambda: self.notification_service.send_cancellation_notice(
order_data["user_id"],
order_data["order_id"]
)
)
]
# Saga 생성 및 실행
self.orchestrator.create_saga(saga_id, steps)
success = await self.orchestrator.execute_saga(saga_id)
return {
"saga_id": saga_id,
"success": success,
"order_id": order_data["order_id"]
}
# 사용 예시
async def main():
saga_processor = OrderProcessingSaga()
order_data = {
"order_id": "ORD-123456",
"user_id": "USER-789",
"items": [
{"product_id": "PROD-001", "quantity": 2},
{"product_id": "PROD-002", "quantity": 1}
],
"total_amount": 150.00,
"payment_method": "credit_card"
}
result = await saga_processor.process_order(order_data)
print(f"Order processing result: {result}")
# asyncio.run(main())
실전 구현 사례
완전한 마이크로서비스 통신 예제
# main.py - 마이크로서비스 통합 예제
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Optional
import httpx
import asyncio
import uuid
# 데이터 모델
class OrderItem(BaseModel):
product_id: str
quantity: int
price: float
class CreateOrderRequest(BaseModel):
user_id: str
items: List[OrderItem]
payment_method: str
class OrderResponse(BaseModel):
order_id: str
status: str
total_amount: float
estimated_delivery: Optional[str] = None
# API Gateway 구현
class APIGateway:
def __init__(self):
self.app = FastAPI(title="E-commerce API Gateway")
self.service_registry = {
"user-service": "http://user-service:8080",
"order-service": "http://order-service:8080",
"inventory-service": "http://inventory-service:8080",
"payment-service": "http://payment-service:8080",
"notification-service": "http://notification-service:8080"
}
self.setup_routes()
def setup_routes(self):
@self.app.post("/orders", response_model=OrderResponse)
async def create_order(
order_request: CreateOrderRequest,
background_tasks: BackgroundTasks
):
return await self.orchestrate_order_creation(order_request, background_tasks)
@self.app.get("/orders/{order_id}")
async def get_order(order_id: str):
return await self.proxy_request("order-service", f"/orders/{order_id}")
@self.app.get("/inventory/{product_id}")
async def get_inventory(product_id: str):
return await self.proxy_request("inventory-service", f"/inventory/{product_id}")
async def orchestrate_order_creation(self, order_request: CreateOrderRequest, background_tasks: BackgroundTasks):
"""주문 생성 오케스트레이션"""
try:
# 1. 사용자 검증 (동기)
user_valid = await self.validate_user(order_request.user_id)
if not user_valid:
raise HTTPException(status_code=400, detail="Invalid user")
# 2. 재고 확인 (동기)
inventory_available = await self.check_inventory(order_request.items)
if not inventory_available:
raise HTTPException(status_code=400, detail="Insufficient inventory")
# 3. 주문 생성 (동기)
order_id = str(uuid.uuid4())
total_amount = sum(item.price * item.quantity for item in order_request.items)
order_response = await self.create_order_record({
"order_id": order_id,
"user_id": order_request.user_id,
"items": [item.dict() for item in order_request.items],
"total_amount": total_amount,
"status": "processing"
})
# 4. 비동기 처리 시작 (백그라운드)
background_tasks.add_task(
self.process_order_async,
order_id,
order_request.dict(),
total_amount
)
return OrderResponse(
order_id=order_id,
status="processing",
total_amount=total_amount,
estimated_delivery="3-5 business days"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def process_order_async(self, order_id: str, order_data: dict, total_amount: float):
"""비동기 주문 처리"""
try:
# 1. 재고 예약
reserve_result = await self.reserve_inventory(order_data["items"])
if not reserve_result["success"]:
await self.cancel_order(order_id, "inventory_failed")
return
# 2. 결제 처리
payment_result = await self.process_payment({
"order_id": order_id,
"amount": total_amount,
"payment_method": order_data["payment_method"]
})
if not payment_result["success"]:
# 재고 해제
await self.release_inventory(order_data["items"])
await self.cancel_order(order_id, "payment_failed")
return
# 3. 주문 완료 처리
await self.complete_order(order_id)
# 4. 알림 발송
await self.send_notifications(order_id, order_data["user_id"])
except Exception as e:
print(f"Async order processing failed: {e}")
await self.cancel_order(order_id, "processing_error")
async def proxy_request(self, service_name: str, path: str, method: str = "GET", data: dict = None):
"""서비스 프록시 요청"""
service_url = self.service_registry[service_name]
async with httpx.AsyncClient() as client:
if method == "GET":
response = await client.get(f"{service_url}{path}")
elif method == "POST":
response = await client.post(f"{service_url}{path}", json=data)
elif method == "PUT":
response = await client.put(f"{service_url}{path}", json=data)
elif method == "DELETE":
response = await client.delete(f"{service_url}{path}")
return response.json()
async def validate_user(self, user_id: str) -> bool:
"""사용자 검증"""
try:
result = await self.proxy_request("user-service", f"/users/{user_id}")
return result.get("active", False)
except:
return False
async def check_inventory(self, items: List[OrderItem]) -> bool:
"""재고 확인"""
try:
result = await self.proxy_request(
"inventory-service",
"/inventory/check",
"POST",
{"items": [item.dict() for item in items]}
)
return result.get("available", False)
except:
return False
async def create_order_record(self, order_data: dict):
"""주문 레코드 생성"""
return await self.proxy_request("order-service", "/orders", "POST", order_data)
async def reserve_inventory(self, items: List[dict]):
"""재고 예약"""
return await self.proxy_request(
"inventory-service",
"/inventory/reserve",
"POST",
{"items": items}
)
async def process_payment(self, payment_data: dict):
"""결제 처리"""
return await self.proxy_request("payment-service", "/payments", "POST", payment_data)
async def complete_order(self, order_id: str):
"""주문 완료 처리"""
return await self.proxy_request(
"order-service",
f"/orders/{order_id}/complete",
"PUT"
)
async def send_notifications(self, order_id: str, user_id: str):
"""알림 발송"""
return await self.proxy_request(
"notification-service",
"/notifications",
"POST",
{
"user_id": user_id,
"order_id": order_id,
"type": "order_completed"
}
)
# 애플리케이션 실행
if __name__ == "__main__":
import uvicorn
gateway = APIGateway()
uvicorn.run(gateway.app, host="0.0.0.0", port=8000)
2편에서는 API Gateway, 서비스 메시, 통신 패턴 등 마이크로서비스의 핵심 인프라 구성 요소들을 다뤘습니다.
3편에서는 데이터 관리, 모니터링, 배포 전략 등 운영 관점의 고급 주제들을 살펴보겠습니다:
- 데이터베이스 패턴 (Database per Service, CQRS)
- 분산 모니터링 및 로깅
- CI/CD 파이프라인과 배포 전략
- 성능 최적화 및 확장 전략
마이크로서비스는 단순한 기술이 아닌 조직 전체의 디지털 트랜스포메이션입니다. 점진적 도입과 지속적 개선이 성공의 열쇠입니다.
'System Architecture' 카테고리의 다른 글
| 마이크로서비스 아키텍처 완벽 가이드 3편: 데이터 관리, 모니터링, 배포 전략 (4) | 2025.07.23 |
|---|---|
| 마이크로서비스 아키텍처 완벽 가이드 1편: 기초와 모노리스 분해 전략 (2) | 2025.07.23 |