Notice
Recent Posts
Recent Comments
Link
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | |
| 7 | 8 | 9 | 10 | 11 | 12 | 13 |
| 14 | 15 | 16 | 17 | 18 | 19 | 20 |
| 21 | 22 | 23 | 24 | 25 | 26 | 27 |
| 28 | 29 | 30 | 31 |
Tags
- 보안
- 모니터링
- 인메모리데이터베이스
- 고가용성
- CI/CD
- 메시지 브로커
- 마이크로서비스
- Kafka 클러스터
- 모노리스 분해
- 마이크로서비스 운영
- ApacheBench
- 이벤트 스트리밍
- Python
- 메시징 패턴
- rabbitmq
- infrastructureascode
- 서비스 메시
- RabbitMQ Exchange
- 컨테이너오케스트레이션
- docker
- 프로덕션 운영
- kubernetes
- 서비스 설계
- 분산 모니터링
- 클라우드
- 클러스터
- 마이크로서비스 통신
- devops
- 세션저장소
- 분산 시스템
Archives
- Today
- Total
hobokai 님의 블로그
마이크로서비스 아키텍처 완벽 가이드 3편: 데이터 관리, 모니터링, 배포 전략 본문
목차
데이터 관리 패턴
마이크로서비스에서 데이터 관리는 가장 복잡하면서도 중요한 영역입니다. 각 서비스는 독립적인 데이터베이스를 가져야 하지만, 서비스 간 데이터 일관성은 유지해야 합니다.
1. Database per Service 패턴
# 서비스별 데이터베이스 분리
services:
user-service:
database:
type: PostgreSQL
schema: user_management
tables:
- users
- user_profiles
- user_preferences
order-service:
database:
type: PostgreSQL
schema: order_management
tables:
- orders
- order_items
- order_history
inventory-service:
database:
type: MongoDB
collections:
- products
- stock_levels
- price_history
payment-service:
database:
type: PostgreSQL
schema: payment_management
tables:
- payments
- transactions
- refunds
# 데이터베이스 분리 구현
from sqlalchemy import create_engine, MetaData
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os
class DatabaseManager:
def __init__(self, service_name):
self.service_name = service_name
self.engines = {}
self.sessions = {}
self.setup_databases()
def setup_databases(self):
"""서비스별 데이터베이스 설정"""
db_configs = {
'user-service': {
'url': os.getenv('USER_DB_URL', 'postgresql://user:pass@user-db:5432/users'),
'schema': 'user_management'
},
'order-service': {
'url': os.getenv('ORDER_DB_URL', 'postgresql://user:pass@order-db:5432/orders'),
'schema': 'order_management'
},
'inventory-service': {
'url': os.getenv('INVENTORY_DB_URL', 'mongodb://inventory-db:27017/inventory'),
'type': 'mongodb'
}
}
config = db_configs.get(self.service_name)
if config:
if config.get('type') == 'mongodb':
self.setup_mongodb(config['url'])
else:
self.setup_postgresql(config['url'], config.get('schema'))
def setup_postgresql(self, db_url, schema_name):
"""PostgreSQL 설정"""
engine = create_engine(
db_url,
pool_size=20,
max_overflow=30,
pool_pre_ping=True,
pool_recycle=3600
)
# 스키마 생성
with engine.connect() as conn:
conn.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")
self.engines['postgresql'] = engine
Session = sessionmaker(bind=engine)
self.sessions['postgresql'] = Session
def setup_mongodb(self, db_url):
"""MongoDB 설정"""
from pymongo import MongoClient
client = MongoClient(db_url)
db = client.get_database()
self.engines['mongodb'] = client
self.sessions['mongodb'] = db
def get_session(self, db_type='postgresql'):
"""데이터베이스 세션 반환"""
if db_type == 'mongodb':
return self.sessions['mongodb']
else:
return self.sessions[db_type]()
# 서비스별 데이터 모델
class UserService:
def __init__(self):
self.db_manager = DatabaseManager('user-service')
self.session = self.db_manager.get_session()
def create_user(self, user_data):
"""사용자 생성"""
try:
user = User(**user_data)
self.session.add(user)
self.session.commit()
# 사용자 생성 이벤트 발행
self.publish_user_created_event(user.id, user_data)
return user.id
except Exception as e:
self.session.rollback()
raise e
def get_user(self, user_id):
"""사용자 조회"""
return self.session.query(User).filter(User.id == user_id).first()
def publish_user_created_event(self, user_id, user_data):
"""사용자 생성 이벤트 발행"""
event_data = {
'event_type': 'user.created',
'user_id': user_id,
'email': user_data['email'],
'timestamp': datetime.now().isoformat()
}
# Kafka로 이벤트 발행
self.event_publisher.publish('user.created', event_data)
2. CQRS (Command Query Responsibility Segregation) 패턴
# CQRS 패턴 구현
from abc import ABC, abstractmethod
from typing import List, Any
import json
class Command(ABC):
"""명령 추상 클래스"""
pass
class Query(ABC):
"""쿼리 추상 클래스"""
pass
class CommandHandler(ABC):
@abstractmethod
async def handle(self, command: Command) -> Any:
pass
class QueryHandler(ABC):
@abstractmethod
async def handle(self, query: Query) -> Any:
pass
# 주문 서비스 CQRS 구현
class CreateOrderCommand(Command):
def __init__(self, user_id: str, items: List[dict], total_amount: float):
self.user_id = user_id
self.items = items
self.total_amount = total_amount
class GetOrderQuery(Query):
def __init__(self, order_id: str):
self.order_id = order_id
class GetUserOrdersQuery(Query):
def __init__(self, user_id: str, limit: int = 10):
self.user_id = user_id
self.limit = limit
class CreateOrderCommandHandler(CommandHandler):
def __init__(self, write_db, event_store):
self.write_db = write_db # 명령용 데이터베이스 (쓰기 최적화)
self.event_store = event_store
async def handle(self, command: CreateOrderCommand) -> str:
"""주문 생성 명령 처리"""
order_id = str(uuid.uuid4())
# Write DB에 주문 저장
order_data = {
'id': order_id,
'user_id': command.user_id,
'items': command.items,
'total_amount': command.total_amount,
'status': 'created',
'created_at': datetime.now()
}
await self.write_db.orders.insert_one(order_data)
# 이벤트 스토어에 이벤트 저장
event = {
'aggregate_id': order_id,
'event_type': 'OrderCreated',
'event_data': order_data,
'version': 1,
'timestamp': datetime.now()
}
await self.event_store.append_event(order_id, event)
return order_id
class GetOrderQueryHandler(QueryHandler):
def __init__(self, read_db):
self.read_db = read_db # 쿼리용 데이터베이스 (읽기 최적화)
async def handle(self, query: GetOrderQuery) -> dict:
"""주문 조회 쿼리 처리"""
# Read DB에서 주문 조회 (비정규화된 데이터)
order = await self.read_db.order_views.find_one(
{'order_id': query.order_id}
)
if not order:
raise OrderNotFoundError(f"Order {query.order_id} not found")
return order
class GetUserOrdersQueryHandler(QueryHandler):
def __init__(self, read_db):
self.read_db = read_db
async def handle(self, query: GetUserOrdersQuery) -> List[dict]:
"""사용자 주문 목록 조회"""
orders = await self.read_db.user_order_views.find(
{'user_id': query.user_id}
).limit(query.limit).to_list(length=query.limit)
return orders
# CQRS 중재자 패턴
class CQRSMediator:
def __init__(self):
self.command_handlers = {}
self.query_handlers = {}
def register_command_handler(self, command_type: type, handler: CommandHandler):
"""명령 핸들러 등록"""
self.command_handlers[command_type] = handler
def register_query_handler(self, query_type: type, handler: QueryHandler):
"""쿼리 핸들러 등록"""
self.query_handlers[query_type] = handler
async def send_command(self, command: Command):
"""명령 전송"""
command_type = type(command)
handler = self.command_handlers.get(command_type)
if not handler:
raise HandlerNotFoundError(f"No handler found for {command_type}")
return await handler.handle(command)
async def send_query(self, query: Query):
"""쿼리 전송"""
query_type = type(query)
handler = self.query_handlers.get(query_type)
if not handler:
raise HandlerNotFoundError(f"No handler found for {query_type}")
return await handler.handle(query)
# 사용 예시
async def setup_cqrs_mediator():
write_db = await get_write_database()
read_db = await get_read_database()
event_store = EventStore()
mediator = CQRSMediator()
# 핸들러 등록
mediator.register_command_handler(
CreateOrderCommand,
CreateOrderCommandHandler(write_db, event_store)
)
mediator.register_query_handler(
GetOrderQuery,
GetOrderQueryHandler(read_db)
)
mediator.register_query_handler(
GetUserOrdersQuery,
GetUserOrdersQueryHandler(read_db)
)
return mediator
# API 엔드포인트에서 사용
from fastapi import FastAPI
app = FastAPI()
mediator = None
@app.on_event("startup")
async def startup_event():
global mediator
mediator = await setup_cqrs_mediator()
@app.post("/orders")
async def create_order(order_request: CreateOrderRequest):
command = CreateOrderCommand(
user_id=order_request.user_id,
items=order_request.items,
total_amount=order_request.total_amount
)
order_id = await mediator.send_command(command)
return {"order_id": order_id, "status": "created"}
@app.get("/orders/{order_id}")
async def get_order(order_id: str):
query = GetOrderQuery(order_id)
order = await mediator.send_query(query)
return order
3. Event Sourcing 패턴
# Event Sourcing 구현
from dataclasses import dataclass
from typing import List, Dict, Any
from datetime import datetime
import json
@dataclass
class Event:
aggregate_id: str
event_type: str
event_data: Dict[str, Any]
version: int
timestamp: datetime
event_id: str = None
def __post_init__(self):
if not self.event_id:
self.event_id = str(uuid.uuid4())
class EventStore:
def __init__(self, database):
self.database = database
async def append_event(self, aggregate_id: str, event: Event):
"""이벤트 저장"""
event_doc = {
'aggregate_id': aggregate_id,
'event_id': event.event_id,
'event_type': event.event_type,
'event_data': event.event_data,
'version': event.version,
'timestamp': event.timestamp
}
await self.database.events.insert_one(event_doc)
async def get_events(self, aggregate_id: str) -> List[Event]:
"""특정 집합체의 모든 이벤트 조회"""
cursor = self.database.events.find(
{'aggregate_id': aggregate_id}
).sort('version', 1)
events = []
async for doc in cursor:
event = Event(
aggregate_id=doc['aggregate_id'],
event_type=doc['event_type'],
event_data=doc['event_data'],
version=doc['version'],
timestamp=doc['timestamp'],
event_id=doc['event_id']
)
events.append(event)
return events
async def get_events_from_version(self, aggregate_id: str, from_version: int) -> List[Event]:
"""특정 버전 이후의 이벤트 조회"""
cursor = self.database.events.find({
'aggregate_id': aggregate_id,
'version': {'$gt': from_version}
}).sort('version', 1)
events = []
async for doc in cursor:
event = Event(
aggregate_id=doc['aggregate_id'],
event_type=doc['event_type'],
event_data=doc['event_data'],
version=doc['version'],
timestamp=doc['timestamp'],
event_id=doc['event_id']
)
events.append(event)
return events
# 집합체 루트 (Aggregate Root)
class OrderAggregate:
def __init__(self, order_id: str):
self.order_id = order_id
self.version = 0
self.user_id = None
self.items = []
self.total_amount = 0.0
self.status = None
self.created_at = None
self.updated_at = None
self.uncommitted_events = []
def create_order(self, user_id: str, items: List[dict], total_amount: float):
"""주문 생성"""
if self.status is not None:
raise ValueError("Order already exists")
event = Event(
aggregate_id=self.order_id,
event_type='OrderCreated',
event_data={
'user_id': user_id,
'items': items,
'total_amount': total_amount
},
version=self.version + 1,
timestamp=datetime.now()
)
self.apply_event(event)
self.uncommitted_events.append(event)
def update_status(self, new_status: str):
"""주문 상태 업데이트"""
if self.status == new_status:
return
event = Event(
aggregate_id=self.order_id,
event_type='OrderStatusUpdated',
event_data={
'old_status': self.status,
'new_status': new_status
},
version=self.version + 1,
timestamp=datetime.now()
)
self.apply_event(event)
self.uncommitted_events.append(event)
def cancel_order(self, reason: str):
"""주문 취소"""
if self.status == 'cancelled':
return
event = Event(
aggregate_id=self.order_id,
event_type='OrderCancelled',
event_data={
'reason': reason,
'cancelled_at': datetime.now().isoformat()
},
version=self.version + 1,
timestamp=datetime.now()
)
self.apply_event(event)
self.uncommitted_events.append(event)
def apply_event(self, event: Event):
"""이벤트 적용"""
if event.event_type == 'OrderCreated':
self.user_id = event.event_data['user_id']
self.items = event.event_data['items']
self.total_amount = event.event_data['total_amount']
self.status = 'created'
self.created_at = event.timestamp
elif event.event_type == 'OrderStatusUpdated':
self.status = event.event_data['new_status']
self.updated_at = event.timestamp
elif event.event_type == 'OrderCancelled':
self.status = 'cancelled'
self.updated_at = event.timestamp
self.version = event.version
def load_from_events(self, events: List[Event]):
"""이벤트로부터 집합체 재구성"""
for event in events:
self.apply_event(event)
def get_uncommitted_events(self) -> List[Event]:
"""커밋되지 않은 이벤트 반환"""
return self.uncommitted_events.copy()
def mark_events_as_committed(self):
"""이벤트를 커밋됨으로 표시"""
self.uncommitted_events.clear()
# 리포지토리 패턴
class OrderRepository:
def __init__(self, event_store: EventStore):
self.event_store = event_store
async def get_by_id(self, order_id: str) -> OrderAggregate:
"""ID로 주문 집합체 조회"""
events = await self.event_store.get_events(order_id)
if not events:
raise OrderNotFoundError(f"Order {order_id} not found")
order = OrderAggregate(order_id)
order.load_from_events(events)
return order
async def save(self, order: OrderAggregate):
"""주문 집합체 저장"""
uncommitted_events = order.get_uncommitted_events()
for event in uncommitted_events:
await self.event_store.append_event(order.order_id, event)
order.mark_events_as_committed()
분산 모니터링과 로깅
Prometheus + Grafana 모니터링
# monitoring-stack.yml
version: '3.8'
services:
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- ./alerts.yml:/etc/prometheus/alerts.yml
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
- '--web.enable-lifecycle'
- '--web.enable-admin-api'
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin123
volumes:
- grafana-data:/var/lib/grafana
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./grafana/datasources:/etc/grafana/provisioning/datasources
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
- "14268:14268"
environment:
- COLLECTOR_OTLP_ENABLED=true
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.0
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
kibana:
image: docker.elastic.co/kibana/kibana:7.17.0
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
volumes:
grafana-data:
분산 추적 (Distributed Tracing) 구현
# tracing.py - OpenTelemetry 분산 추적
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.flask import FlaskInstrumentor
import logging
class TracingManager:
def __init__(self, service_name: str, jaeger_endpoint: str = "http://jaeger:14268/api/traces"):
self.service_name = service_name
self.jaeger_endpoint = jaeger_endpoint
self.setup_tracing()
def setup_tracing(self):
"""분산 추적 설정"""
# Tracer Provider 설정
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Jaeger Exporter 설정
jaeger_exporter = JaegerExporter(
agent_host_name="jaeger",
agent_port=6831,
)
# Span Processor 설정
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# 자동 계측 설정
RequestsInstrumentor().instrument()
SQLAlchemyInstrumentor().instrument()
return tracer
def trace_method(self, operation_name: str):
"""메서드 추적 데코레이터"""
def decorator(func):
def wrapper(*args, **kwargs):
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(operation_name) as span:
span.set_attribute("service.name", self.service_name)
span.set_attribute("operation.name", operation_name)
try:
result = func(*args, **kwargs)
span.set_attribute("operation.result", "success")
return result
except Exception as e:
span.set_attribute("operation.result", "error")
span.set_attribute("error.message", str(e))
span.record_exception(e)
raise e
return wrapper
return decorator
# 서비스별 추적 구현
class OrderServiceTracing:
def __init__(self):
self.tracing = TracingManager("order-service")
self.tracer = trace.get_tracer(__name__)
@TracingManager.trace_method("create_order")
def create_order(self, order_data):
"""주문 생성 추적"""
with self.tracer.start_as_current_span("validate_user") as span:
span.set_attribute("user.id", order_data["user_id"])
user_valid = self.validate_user(order_data["user_id"])
if not user_valid:
span.set_attribute("validation.result", "failed")
raise ValueError("Invalid user")
span.set_attribute("validation.result", "success")
with self.tracer.start_as_current_span("check_inventory"):
inventory_available = self.check_inventory(order_data["items"])
if not inventory_available:
raise ValueError("Insufficient inventory")
with self.tracer.start_as_current_span("save_order"):
order_id = self.save_order(order_data)
return order_id
def validate_user(self, user_id):
"""사용자 검증 (외부 서비스 호출)"""
with self.tracer.start_as_current_span("call_user_service") as span:
span.set_attribute("http.method", "GET")
span.set_attribute("http.url", f"http://user-service:8080/users/{user_id}")
# 실제 HTTP 호출 (자동 추적됨)
response = requests.get(f"http://user-service:8080/users/{user_id}")
span.set_attribute("http.status_code", response.status_code)
return response.status_code == 200
# 커스텀 메트릭 수집
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
class MetricsCollector:
def __init__(self, service_name: str):
self.service_name = service_name
# 카운터 메트릭
self.request_count = Counter(
'requests_total',
'Total number of requests',
['service', 'method', 'endpoint', 'status']
)
# 히스토그램 메트릭
self.request_duration = Histogram(
'request_duration_seconds',
'Request duration in seconds',
['service', 'method', 'endpoint']
)
# 게이지 메트릭
self.active_connections = Gauge(
'active_connections',
'Number of active connections',
['service']
)
# 비즈니스 메트릭
self.orders_created = Counter(
'orders_created_total',
'Total number of orders created',
['service']
)
self.order_amount = Histogram(
'order_amount',
'Order amount distribution',
['service']
)
def record_request(self, method: str, endpoint: str, status: int, duration: float):
"""요청 메트릭 기록"""
self.request_count.labels(
service=self.service_name,
method=method,
endpoint=endpoint,
status=status
).inc()
self.request_duration.labels(
service=self.service_name,
method=method,
endpoint=endpoint
).observe(duration)
def record_order_created(self, amount: float):
"""주문 생성 메트릭 기록"""
self.orders_created.labels(service=self.service_name).inc()
self.order_amount.labels(service=self.service_name).observe(amount)
def set_active_connections(self, count: int):
"""활성 연결 수 설정"""
self.active_connections.labels(service=self.service_name).set(count)
# Flask 애플리케이션에 메트릭 적용
from flask import Flask, request, g
import time
def create_monitored_app(service_name: str):
app = Flask(__name__)
metrics = MetricsCollector(service_name)
tracing = TracingManager(service_name)
@app.before_request
def before_request():
g.start_time = time.time()
@app.after_request
def after_request(response):
duration = time.time() - g.start_time
metrics.record_request(
method=request.method,
endpoint=request.endpoint or 'unknown',
status=response.status_code,
duration=duration
)
return response
# 메트릭 엔드포인트
@app.route('/metrics')
def metrics_endpoint():
from prometheus_client import generate_latest
return generate_latest()
# 헬스체크 엔드포인트
@app.route('/health')
def health_check():
return {"status": "healthy", "service": service_name}
return app, metrics, tracing
# 사용 예시
app, metrics, tracing = create_monitored_app("order-service")
@app.route('/orders', methods=['POST'])
@tracing.trace_method("create_order_endpoint")
def create_order():
order_data = request.json
try:
order_id = create_order_logic(order_data)
metrics.record_order_created(order_data['total_amount'])
return {"order_id": order_id, "status": "created"}, 201
except Exception as e:
return {"error": str(e)}, 400
if __name__ == '__main__':
# Prometheus 메트릭 서버 시작
start_http_server(8001)
# Flask 애플리케이션 실행
app.run(host='0.0.0.0', port=8000)
CI/CD 파이프라인 구축
Jenkins Pipeline 구성
// Jenkinsfile
pipeline {
agent any
environment {
DOCKER_REGISTRY = 'your-registry.com'
KUBECONFIG = credentials('kubeconfig')
SONAR_TOKEN = credentials('sonar-token')
}
stages {
stage('Checkout') {
steps {
checkout scm
}
}
stage('Build & Test') {
parallel {
stage('User Service') {
steps {
dir('user-service') {
sh '''
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt
python -m pytest tests/ --cov=src --cov-report=xml
'''
}
}
post {
always {
publishCoverage adapters: [
coberturaAdapter('user-service/coverage.xml')
]
}
}
}
stage('Order Service') {
steps {
dir('order-service') {
sh '''
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt
python -m pytest tests/ --cov=src --cov-report=xml
'''
}
}
}
stage('Inventory Service') {
steps {
dir('inventory-service') {
sh '''
npm ci
npm run test:coverage
'''
}
}
}
}
}
stage('Code Quality') {
steps {
script {
def services = ['user-service', 'order-service', 'inventory-service']
services.each { service ->
dir(service) {
sh """
sonar-scanner \
-Dsonar.projectKey=${service} \
-Dsonar.sources=src \
-Dsonar.host.url=http://sonarqube:9000 \
-Dsonar.login=${SONAR_TOKEN}
"""
}
}
}
}
}
stage('Build Docker Images') {
steps {
script {
def services = [
'user-service': 'user-service/Dockerfile',
'order-service': 'order-service/Dockerfile',
'inventory-service': 'inventory-service/Dockerfile',
'payment-service': 'payment-service/Dockerfile'
]
services.each { serviceName, dockerfile ->
def image = docker.build(
"${DOCKER_REGISTRY}/${serviceName}:${BUILD_NUMBER}",
"-f ${dockerfile} ."
)
// 보안 스캔
sh "trivy image ${DOCKER_REGISTRY}/${serviceName}:${BUILD_NUMBER}"
// 이미지 푸시
docker.withRegistry("https://${DOCKER_REGISTRY}") {
image.push()
image.push("latest")
}
}
}
}
}
stage('Deploy to Staging') {
steps {
script {
sh '''
helm upgrade --install microservices-staging ./helm-chart \
--namespace staging \
--set image.tag=${BUILD_NUMBER} \
--set environment=staging \
--wait --timeout=10m
'''
}
}
}
stage('Integration Tests') {
steps {
dir('integration-tests') {
sh '''
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt
python -m pytest tests/ --html=report.html
'''
}
}
post {
always {
publishHTML([
allowMissing: false,
alwaysLinkToLastBuild: true,
keepAll: true,
reportDir: 'integration-tests',
reportFiles: 'report.html',
reportName: 'Integration Test Report'
])
}
}
}
stage('Performance Tests') {
steps {
sh '''
k6 run --out influxdb=http://influxdb:8086/k6 \
performance-tests/load-test.js
'''
}
}
stage('Deploy to Production') {
when {
branch 'main'
}
steps {
input message: 'Deploy to production?', ok: 'Deploy'
script {
// Blue-Green 배포
sh '''
# Green 환경에 배포
helm upgrade --install microservices-green ./helm-chart \
--namespace production \
--set image.tag=${BUILD_NUMBER} \
--set environment=production \
--set deployment.color=green \
--wait --timeout=15m
# 헬스체크
kubectl wait --for=condition=ready pod \
-l app.kubernetes.io/instance=microservices-green \
-n production --timeout=300s
# 트래픽 전환
kubectl patch service microservices-svc \
-n production \
-p '{"spec":{"selector":{"deployment":"green"}}}'
# Blue 환경 정리 (선택적)
# helm uninstall microservices-blue -n production
'''
}
}
}
}
post {
always {
// 테스트 결과 수집
junit testResults: '**/test-results.xml', allowEmptyResults: true
// 아티팩트 보관
archiveArtifacts artifacts: '**/target/*.jar,**/dist/*', fingerprint: true
// 슬랙 알림
slackSend(
channel: '#deployments',
color: currentBuild.result == 'SUCCESS' ? 'good' : 'danger',
message: "Pipeline ${currentBuild.fullDisplayName} ${currentBuild.result}"
)
}
failure {
emailext(
subject: "Pipeline Failed: ${env.JOB_NAME} - ${env.BUILD_NUMBER}",
body: "The pipeline has failed. Please check the build logs.",
to: "${env.CHANGE_AUTHOR_EMAIL}"
)
}
}
}
GitLab CI/CD 구성
# .gitlab-ci.yml
stages:
- test
- build
- security-scan
- deploy-staging
- integration-test
- deploy-production
variables:
DOCKER_REGISTRY: registry.gitlab.com
KUBERNETES_NAMESPACE_STAGING: microservices-staging
KUBERNETES_NAMESPACE_PRODUCTION: microservices-production
# 테스트 단계
test:user-service:
stage: test
image: python:3.9
before_script:
- cd user-service
- pip install -r requirements.txt
script:
- python -m pytest tests/ --cov=src --cov-report=xml --junitxml=test-results.xml
coverage: '/TOTAL.*\s+(\d+%)$/'
artifacts:
reports:
junit: user-service/test-results.xml
coverage_report:
coverage_format: cobertura
path: user-service/coverage.xml
test:order-service:
stage: test
image: python:3.9
before_script:
- cd order-service
- pip install -r requirements.txt
script:
- python -m pytest tests/ --cov=src --cov-report=xml --junitxml=test-results.xml
artifacts:
reports:
junit: order-service/test-results.xml
# 빌드 단계
build:
stage: build
image: docker:20.10.16
services:
- docker:20.10.16-dind
before_script:
- docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY
script:
- |
for service in user-service order-service inventory-service payment-service; do
docker build -t $CI_REGISTRY_IMAGE/$service:$CI_COMMIT_SHA -f $service/Dockerfile .
docker push $CI_REGISTRY_IMAGE/$service:$CI_COMMIT_SHA
docker tag $CI_REGISTRY_IMAGE/$service:$CI_COMMIT_SHA $CI_REGISTRY_IMAGE/$service:latest
docker push $CI_REGISTRY_IMAGE/$service:latest
done
# 보안 스캔
security-scan:
stage: security-scan
image: aquasec/trivy:latest
script:
- |
for service in user-service order-service inventory-service payment-service; do
trivy image --exit-code 0 --severity HIGH,CRITICAL \
--format template --template "@contrib/gitlab.tpl" \
--output gl-security-$service.json \
$CI_REGISTRY_IMAGE/$service:$CI_COMMIT_SHA
done
artifacts:
reports:
container_scanning: gl-security-*.json
# 스테이징 배포
deploy-staging:
stage: deploy-staging
image: bitnami/kubectl:latest
before_script:
- echo "$KUBE_CONFIG" | base64 -d > kubeconfig
- export KUBECONFIG=kubeconfig
script:
- |
helm upgrade --install microservices-staging ./helm-chart \
--namespace $KUBERNETES_NAMESPACE_STAGING \
--create-namespace \
--set image.tag=$CI_COMMIT_SHA \
--set environment=staging \
--wait --timeout=10m
environment:
name: staging
url: https://staging.microservices.example.com
# 통합 테스트
integration-test:
stage: integration-test
image: python:3.9
needs: ["deploy-staging"]
before_script:
- cd integration-tests
- pip install -r requirements.txt
script:
- python -m pytest tests/ --html=report.html --self-contained-html
artifacts:
when: always
paths:
- integration-tests/report.html
expire_in: 1 week
# 프로덕션 배포
deploy-production:
stage: deploy-production
image: bitnami/kubectl:latest
before_script:
- echo "$KUBE_CONFIG" | base64 -d > kubeconfig
- export KUBECONFIG=kubeconfig
script:
- |
# Canary 배포
helm upgrade --install microservices-canary ./helm-chart \
--namespace $KUBERNETES_NAMESPACE_PRODUCTION \
--set image.tag=$CI_COMMIT_SHA \
--set environment=production \
--set deployment.strategy=canary \
--set deployment.canary.weight=10 \
--wait --timeout=15m
# 5분 대기 후 트래픽 점진적 증가
sleep 300
for weight in 25 50 75 100; do
helm upgrade microservices ./helm-chart \
--namespace $KUBERNETES_NAMESPACE_PRODUCTION \
--set deployment.canary.weight=$weight \
--wait --timeout=5m
sleep 180
done
when: manual
only:
- main
environment:
name: production
url: https://microservices.example.com
성능 최적화와 확장 전략
자동 스케일링 구성
# hpa.yaml - Horizontal Pod Autoscaler
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: order-service-hpa
namespace: production
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: order-service
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: http_requests_per_second
target:
type: AverageValue
averageValue: "100"
behavior:
scaleUp:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
---
# vpa.yaml - Vertical Pod Autoscaler
apiVersion: autoscaling.k8s.io/v1
kind: VerticalPodAutoscaler
metadata:
name: order-service-vpa
spec:
targetRef:
apiVersion: apps/v1
kind: Deployment
name: order-service
updatePolicy:
updateMode: "Auto"
resourcePolicy:
containerPolicies:
- containerName: order-service
maxAllowed:
cpu: 2
memory: 4Gi
minAllowed:
cpu: 100m
memory: 128Mi
성능 최적화 구현
# performance_optimization.py
import asyncio
import aioredis
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache
import time
class PerformanceOptimizer:
def __init__(self):
self.redis_client = None
self.thread_pool = ThreadPoolExecutor(max_workers=10)
self.cache = {}
self.circuit_breakers = {}
async def initialize(self):
"""성능 최적화 초기화"""
self.redis_client = await aioredis.from_url("redis://redis:6379")
# 1. 캐싱 전략
@lru_cache(maxsize=1000)
def get_user_cache(self, user_id: str):
"""사용자 정보 로컬 캐시"""
return self._fetch_user_from_db(user_id)
async def get_user_with_redis_cache(self, user_id: str):
"""Redis 캐시를 사용한 사용자 조회"""
cache_key = f"user:{user_id}"
# Redis에서 캐시 확인
cached_user = await self.redis_client.get(cache_key)
if cached_user:
return json.loads(cached_user)
# 캐시 미스 시 데이터베이스에서 조회
user = await self._fetch_user_from_db_async(user_id)
# Redis에 캐시 저장 (TTL: 1시간)
await self.redis_client.setex(
cache_key,
3600,
json.dumps(user, default=str)
)
return user
# 2. 연결 풀링
class DatabaseConnectionPool:
def __init__(self, max_connections=20):
self.pool = asyncio.Queue(maxsize=max_connections)
self.max_connections = max_connections
self._initialize_pool()
async def _initialize_pool(self):
"""연결 풀 초기화"""
for _ in range(self.max_connections):
connection = await self._create_connection()
await self.pool.put(connection)
async def get_connection(self):
"""연결 풀에서 연결 가져오기"""
return await self.pool.get()
async def return_connection(self, connection):
"""연결 풀에 연결 반환"""
await self.pool.put(connection)
async def _create_connection(self):
# 실제 데이터베이스 연결 생성
pass
# 3. 배치 처리
class BatchProcessor:
def __init__(self, batch_size=100, flush_interval=5):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.batch = []
self.last_flush = time.time()
async def add_item(self, item):
"""배치에 아이템 추가"""
self.batch.append(item)
# 배치 크기 또는 시간 간격 조건 확인
if (len(self.batch) >= self.batch_size or
time.time() - self.last_flush >= self.flush_interval):
await self.flush()
async def flush(self):
"""배치 처리 실행"""
if not self.batch:
return
try:
await self._process_batch(self.batch.copy())
self.batch.clear()
self.last_flush = time.time()
except Exception as e:
print(f"Batch processing error: {e}")
async def _process_batch(self, items):
"""실제 배치 처리 로직"""
# 데이터베이스 배치 Insert/Update
pass
# 4. 비동기 처리
async def process_order_async(self, order_data):
"""주문 비동기 처리"""
tasks = [
self.validate_user_async(order_data['user_id']),
self.check_inventory_async(order_data['items']),
self.calculate_shipping_async(order_data['address'])
]
# 병렬 실행
user_valid, inventory_ok, shipping_cost = await asyncio.gather(*tasks)
if user_valid and inventory_ok:
# 주문 생성
order_id = await self.create_order_async(order_data, shipping_cost)
# 백그라운드 작업 시작
asyncio.create_task(self.post_order_processing(order_id))
return order_id
else:
raise ValueError("Order validation failed")
# 5. 부하분산 라우팅
class LoadBalancedServiceClient:
def __init__(self, service_urls):
self.service_urls = service_urls
self.current_index = 0
self.health_status = {url: True for url in service_urls}
async def call_service(self, endpoint, data=None):
"""부하분산된 서비스 호출"""
attempts = 0
max_attempts = len(self.service_urls)
while attempts < max_attempts:
service_url = self._get_next_healthy_service()
if not service_url:
raise Exception("No healthy services available")
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{service_url}{endpoint}",
json=data,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status == 200:
self.health_status[service_url] = True
return await response.json()
else:
raise aiohttp.ClientError(f"HTTP {response.status}")
except Exception as e:
self.health_status[service_url] = False
attempts += 1
continue
raise Exception("All service calls failed")
def _get_next_healthy_service(self):
"""다음 healthy 서비스 URL 반환"""
healthy_services = [
url for url, healthy in self.health_status.items()
if healthy
]
if not healthy_services:
return None
# Round-robin 로드밸런싱
service = healthy_services[self.current_index % len(healthy_services)]
self.current_index += 1
return service
# 사용 예시
async def optimized_order_service():
optimizer = PerformanceOptimizer()
await optimizer.initialize()
# 최적화된 주문 처리
order_data = {
'user_id': 'user123',
'items': [{'product_id': 'prod1', 'quantity': 2}],
'address': '서울시 강남구'
}
order_id = await optimizer.process_order_async(order_data)
return order_id
운영 모범 사례
1. 장애 대응 플레이북
# incident_response.py
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict
import asyncio
import logging
class IncidentSeverity(Enum):
CRITICAL = "critical" # 서비스 완전 중단
HIGH = "high" # 주요 기능 장애
MEDIUM = "medium" # 일부 기능 영향
LOW = "low" # 경미한 문제
@dataclass
class IncidentResponse:
incident_id: str
severity: IncidentSeverity
description: str
affected_services: List[str]
response_steps: List[str]
escalation_contacts: List[str]
class IncidentManager:
def __init__(self):
self.incident_playbooks = {
"service_down": IncidentResponse(
incident_id="INC-001",
severity=IncidentSeverity.CRITICAL,
description="Service completely unavailable",
affected_services=["order-service"],
response_steps=[
"1. Check service health endpoints",
"2. Verify database connectivity",
"3. Check resource utilization",
"4. Restart service if necessary",
"5. Scale up replicas if needed",
"6. Failover to backup region if required"
],
escalation_contacts=["devops-team", "engineering-lead"]
),
"high_latency": IncidentResponse(
incident_id="INC-002",
severity=IncidentSeverity.HIGH,
description="Service response time > 5 seconds",
affected_services=["order-service", "payment-service"],
response_steps=[
"1. Check database query performance",
"2. Verify cache hit rates",
"3. Check resource utilization",
"4. Scale up if CPU/Memory usage > 80%",
"5. Enable circuit breakers",
"6. Implement graceful degradation"
],
escalation_contacts=["performance-team"]
),
"database_connection_pool_exhausted": IncidentResponse(
incident_id="INC-003",
severity=IncidentSeverity.HIGH,
description="Database connection pool exhausted",
affected_services=["order-service", "user-service"],
response_steps=[
"1. Check active database connections",
"2. Identify long-running queries",
"3. Kill blocking queries if necessary",
"4. Increase connection pool size",
"5. Implement connection retry logic",
"6. Add database read replicas"
],
escalation_contacts=["database-team", "backend-team"]
)
}
async def handle_incident(self, incident_type: str, context: Dict):
"""장애 대응 실행"""
playbook = self.incident_playbooks.get(incident_type)
if not playbook:
logging.error(f"No playbook found for incident: {incident_type}")
return
logging.critical(f"Incident detected: {playbook.description}")
logging.critical(f"Affected services: {playbook.affected_services}")
# 자동 대응 실행
for step in playbook.response_steps:
logging.info(f"Executing: {step}")
await self.execute_response_step(step, context)
# 에스컬레이션 알림
await self.send_escalation_alerts(playbook, context)
async def execute_response_step(self, step: str, context: Dict):
"""대응 단계 실행"""
if "restart service" in step.lower():
await self.restart_service(context.get('service_name'))
elif "scale up" in step.lower():
await self.scale_service(context.get('service_name'), context.get('replicas', 5))
elif "enable circuit breakers" in step.lower():
await self.enable_circuit_breakers(context.get('service_name'))
# ... 기타 자동화 가능한 대응 단계들
# 2. 용량 계획 및 리소스 관리
class CapacityPlanner:
def __init__(self):
self.historical_data = {}
self.growth_rates = {}
def analyze_capacity_requirements(self, service_name: str, forecast_days: int = 30):
"""용량 요구사항 분석"""
current_metrics = self.get_current_metrics(service_name)
historical_trends = self.analyze_trends(service_name, days=90)
# 예상 증가율 계산
growth_rate = self.calculate_growth_rate(historical_trends)
# 미래 용량 예측
forecasted_capacity = self.forecast_capacity(
current_metrics,
growth_rate,
forecast_days
)
return {
'current_capacity': current_metrics,
'forecasted_capacity': forecasted_capacity,
'recommended_scaling': self.calculate_recommended_scaling(forecasted_capacity),
'cost_projection': self.calculate_cost_projection(forecasted_capacity)
}
def calculate_recommended_scaling(self, forecasted_capacity):
"""권장 스케일링 전략"""
recommendations = {}
for metric, value in forecasted_capacity.items():
if metric == 'cpu_utilization' and value > 70:
recommendations['cpu'] = {
'action': 'scale_up',
'target_replicas': int(value / 50), # CPU 50% 목표
'timeline': 'immediate'
}
if metric == 'memory_utilization' and value > 80:
recommendations['memory'] = {
'action': 'upgrade_resources',
'target_memory': f"{int(value / 60)}Gi", # Memory 60% 목표
'timeline': 'next_maintenance_window'
}
if metric == 'requests_per_second' and value > 1000:
recommendations['throughput'] = {
'action': 'horizontal_scaling',
'target_replicas': int(value / 500), # 500 RPS per replica
'timeline': 'gradual_rollout'
}
return recommendations
# 3. 비용 최적화
class CostOptimizer:
def __init__(self):
self.cost_thresholds = {
'compute': 1000, # USD per month
'storage': 200, # USD per month
'network': 100 # USD per month
}
def analyze_cost_optimization(self):
"""비용 최적화 분석"""
current_costs = self.get_current_costs()
optimization_opportunities = []
# 1. 미사용 리소스 식별
unused_resources = self.identify_unused_resources()
if unused_resources:
optimization_opportunities.append({
'type': 'unused_resources',
'description': 'Remove unused resources',
'potential_savings': self.calculate_savings(unused_resources),
'resources': unused_resources
})
# 2. 오버프로비저닝 식별
over_provisioned = self.identify_over_provisioning()
if over_provisioned:
optimization_opportunities.append({
'type': 'over_provisioning',
'description': 'Right-size over-provisioned resources',
'potential_savings': self.calculate_right_sizing_savings(over_provisioned),
'recommendations': over_provisioned
})
# 3. 예약 인스턴스 기회
reservation_opportunities = self.identify_reservation_opportunities()
if reservation_opportunities:
optimization_opportunities.append({
'type': 'reserved_instances',
'description': 'Use reserved instances for stable workloads',
'potential_savings': self.calculate_reservation_savings(reservation_opportunities),
'recommendations': reservation_opportunities
})
return {
'current_monthly_cost': current_costs,
'optimization_opportunities': optimization_opportunities,
'total_potential_savings': sum(
opp['potential_savings'] for opp in optimization_opportunities
)
}
# 4. 보안 모범 사례
class SecurityManager:
def __init__(self):
self.security_policies = {}
self.compliance_checks = []
def implement_security_best_practices(self):
"""보안 모범 사례 구현"""
return {
'network_security': {
'service_mesh_mtls': 'Enable mTLS between all services',
'network_policies': 'Implement Kubernetes NetworkPolicies',
'ingress_security': 'Use Web Application Firewall (WAF)'
},
'authentication_authorization': {
'oauth2_oidc': 'Implement OAuth2/OIDC for user authentication',
'rbac': 'Use Role-Based Access Control (RBAC)',
'service_accounts': 'Use service accounts for inter-service communication'
},
'data_security': {
'encryption_at_rest': 'Encrypt all data at rest',
'encryption_in_transit': 'Use TLS 1.3 for all communications',
'key_management': 'Use dedicated key management service'
},
'runtime_security': {
'container_scanning': 'Scan container images for vulnerabilities',
'runtime_monitoring': 'Monitor for suspicious runtime behavior',
'secrets_management': 'Use Kubernetes Secrets or HashiCorp Vault'
}
}
def run_security_audit(self):
"""보안 감사 실행"""
audit_results = []
# 컨테이너 이미지 취약점 스캔
audit_results.append(self.scan_container_vulnerabilities())
# 네트워크 보안 검사
audit_results.append(self.check_network_security())
# 접근 권한 검토
audit_results.append(self.review_access_permissions())
# 시크릿 관리 검토
audit_results.append(self.review_secrets_management())
return {
'audit_date': datetime.now().isoformat(),
'results': audit_results,
'compliance_score': self.calculate_compliance_score(audit_results)
}
# 종합 운영 대시보드
class OperationalDashboard:
def __init__(self):
self.incident_manager = IncidentManager()
self.capacity_planner = CapacityPlanner()
self.cost_optimizer = CostOptimizer()
self.security_manager = SecurityManager()
async def generate_operational_report(self):
"""운영 종합 리포트 생성"""
return {
'timestamp': datetime.now().isoformat(),
'service_health': await self.get_service_health_summary(),
'capacity_analysis': self.capacity_planner.analyze_capacity_requirements('all'),
'cost_optimization': self.cost_optimizer.analyze_cost_optimization(),
'security_status': self.security_manager.run_security_audit(),
'recent_incidents': await self.get_recent_incidents(),
'recommendations': await self.generate_recommendations()
}
async def generate_recommendations(self):
"""운영 개선 권장사항 생성"""
recommendations = []
# 성능 개선 권장사항
recommendations.extend(await self.get_performance_recommendations())
# 비용 최적화 권장사항
recommendations.extend(self.get_cost_optimization_recommendations())
# 보안 강화 권장사항
recommendations.extend(self.get_security_recommendations())
# 운영 효율성 권장사항
recommendations.extend(self.get_operational_efficiency_recommendations())
return sorted(recommendations, key=lambda x: x['priority'], reverse=True)
마이크로서비스 아키텍처 3편으로 전체 가이드를 완성했습니다!
3편에서 다룬 핵심 내용:
- 🗄️ 데이터 관리: Database per Service, CQRS, Event Sourcing
- 📊 모니터링: 분산 추적, 메트릭 수집, 로그 집계
- 🚀 CI/CD: Jenkins/GitLab 파이프라인, 자동화된 배포
- ⚡ 성능 최적화: 캐싱, 연결 풀링, 부하 분산
- 🛡️ 운영 모범 사례: 장애 대응, 용량 계획, 보안 관리
마이크로서비스 여정 완료! 🎉
이제 마이크로서비스 아키텍처를 설계부터 운영까지 전체적으로 이해하고 구현할 수 있는 지식을 갖추었습니다. 실제 프로덕션 환경에서는 조직의 성숙도와 비즈니스 요구사항에 맞춰 단계적으로 적용하는 것이 중요합니다.
성공적인 마이크로서비스 전환의 핵심은 기술적 완성도뿐만 아니라 조직 문화, 프로세스, 그리고 지속적인 학습과 개선에 있습니다.
'System Architecture' 카테고리의 다른 글
| 마이크로서비스 아키텍처 완벽 가이드 2편: API Gateway, 서비스 메시, 통신 패턴 (3) | 2025.07.23 |
|---|---|
| 마이크로서비스 아키텍처 완벽 가이드 1편: 기초와 모노리스 분해 전략 (2) | 2025.07.23 |