Notice
Recent Posts
Recent Comments
Link
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | |
| 7 | 8 | 9 | 10 | 11 | 12 | 13 |
| 14 | 15 | 16 | 17 | 18 | 19 | 20 |
| 21 | 22 | 23 | 24 | 25 | 26 | 27 |
| 28 | 29 | 30 | 31 |
Tags
- 분산 모니터링
- 컨테이너오케스트레이션
- 세션저장소
- 이벤트 스트리밍
- RabbitMQ Exchange
- 서비스 메시
- 서비스 설계
- 고가용성
- 메시지 브로커
- 마이크로서비스 운영
- 모노리스 분해
- docker
- 클러스터
- 보안
- 마이크로서비스
- 마이크로서비스 통신
- rabbitmq
- kubernetes
- 분산 시스템
- Kafka 클러스터
- ApacheBench
- Python
- 메시징 패턴
- 프로덕션 운영
- 모니터링
- 인메모리데이터베이스
- 클라우드
- CI/CD
- infrastructureascode
- devops
Archives
- Today
- Total
hobokai 님의 블로그
Kafka Producer/Consumer 고급 가이드 - 파티셔닝, 스트림 처리, 성능 최적화 본문
Apache Kafka 완벽 가이드 2편: Producer/Consumer 고급 기능과 Kafka Streams
목차
Producer 고급 기능
1편에서 배운 기본 Producer를 발전시켜 프로덕션 환경에 적합한 고급 기능들을 살펴보겠습니다.
커스텀 파티셔너
기본 파티셔닝 로직을 대신해 비즈니스 요구사항에 맞는 파티셔닝을 구현할 수 있습니다.
from kafka import KafkaProducer
from kafka.partitioner.base import Partitioner
import hashlib
class UserLocationPartitioner(Partitioner):
"""사용자 지역별 파티셔닝"""
def __init__(self):
# 지역별 파티션 매핑
self.region_partitions = {
'seoul': 0,
'busan': 1,
'daegu': 2,
'incheon': 3,
'gwangju': 4,
'daejeon': 5,
'ulsan': 6,
'others': 7
}
def partition(self, key, all_partitions, available_partitions):
"""
key 형식: "user_id:region" (예: "user123:seoul")
"""
if key is None:
# 키가 없으면 라운드 로빈
return available_partitions[0] if available_partitions else 0
try:
user_id, region = key.split(':')
partition = self.region_partitions.get(region.lower(),
self.region_partitions['others'])
# 사용 가능한 파티션 중에서 선택
if partition < len(all_partitions):
return partition
else:
# 해시 기반 대체 로직
hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
return hash_value % len(all_partitions)
except ValueError:
# 키 형식이 잘못된 경우 해시 기반 파티셔닝
hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
return hash_value % len(all_partitions)
# 커스텀 파티셔너 사용
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8'),
partitioner=UserLocationPartitioner(),
acks='all',
retries=3
)
# 지역별 메시지 전송
regions = ['seoul', 'busan', 'daegu', 'incheon']
for i, region in enumerate(regions):
key = f"user{i+1}:{region}"
value = {
'user_id': f'user{i+1}',
'region': region,
'action': '상품구매',
'amount': (i+1) * 10000
}
future = producer.send('regional-orders', key=key, value=value)
record = future.get()
print(f"지역 {region} → 파티션 {record.partition}")
배치 처리와 압축 최적화
import json
import time
from kafka import KafkaProducer
class OptimizedProducer:
def __init__(self):
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
# 성능 최적화 설정
acks='all', # 모든 복제본 확인
retries=3, # 재시도 횟수
batch_size=32768, # 배치 크기 (32KB)
linger_ms=10, # 배치 대기 시간
compression_type='snappy', # 압축 (snappy, gzip, lz4)
buffer_memory=67108864, # 버퍼 메모리 (64MB)
# 안정성 설정
max_in_flight_requests_per_connection=1, # 순서 보장
enable_idempotence=True, # 중복 방지
request_timeout_ms=30000, # 요청 타임아웃
delivery_timeout_ms=120000 # 전달 타임아웃
)
def send_batch_events(self, events):
"""배치로 이벤트 전송"""
futures = []
for event in events:
future = self.producer.send(
topic=event['topic'],
key=event.get('key'),
value=event['data'],
timestamp_ms=int(time.time() * 1000)
)
futures.append((future, event))
# 배치 전송 강제 실행
self.producer.flush()
# 결과 확인
results = []
for future, event in futures:
try:
record = future.get(timeout=10)
results.append({
'success': True,
'topic': record.topic,
'partition': record.partition,
'offset': record.offset,
'event_id': event.get('id', 'unknown')
})
except Exception as e:
results.append({
'success': False,
'error': str(e),
'event_id': event.get('id', 'unknown')
})
return results
def close(self):
self.producer.close()
# 사용 예시
producer = OptimizedProducer()
# 대량 이벤트 준비
events = []
for i in range(1000):
events.append({
'id': f'event_{i}',
'topic': 'user-activities',
'key': f'user_{i % 100}', # 100명의 사용자 순환
'data': {
'user_id': f'user_{i % 100}',
'action': f'action_{i % 10}',
'timestamp': int(time.time() * 1000),
'session_id': f'session_{i // 10}'
}
})
print("📦 배치 전송 시작...")
start_time = time.time()
results = producer.send_batch_events(events)
end_time = time.time()
# 결과 분석
success_count = sum(1 for r in results if r['success'])
fail_count = len(results) - success_count
print(f"✅ 성공: {success_count}")
print(f"❌ 실패: {fail_count}")
print(f"⏱️ 소요 시간: {end_time - start_time:.2f}초")
print(f"📊 처리량: {len(events) / (end_time - start_time):.0f} msgs/sec")
producer.close()
트랜잭션 처리
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
class TransactionalProducer:
def __init__(self, transactional_id):
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8'),
# 트랜잭션 설정
transactional_id=transactional_id,
enable_idempotence=True,
acks='all',
retries=3
)
# 트랜잭션 초기화
self.producer.init_transactions()
def process_order_transaction(self, order_data):
"""주문 처리 트랜잭션"""
try:
# 트랜잭션 시작
self.producer.begin_transaction()
# 1. 주문 생성 이벤트
self.producer.send(
'order-events',
key=order_data['order_id'],
value={
'type': 'order_created',
'order_id': order_data['order_id'],
'user_id': order_data['user_id'],
'items': order_data['items'],
'total_amount': order_data['total_amount']
}
)
# 2. 재고 차감 이벤트
for item in order_data['items']:
self.producer.send(
'inventory-events',
key=item['product_id'],
value={
'type': 'stock_reserved',
'product_id': item['product_id'],
'quantity': item['quantity'],
'order_id': order_data['order_id']
}
)
# 3. 결제 요청 이벤트
self.producer.send(
'payment-events',
key=order_data['order_id'],
value={
'type': 'payment_requested',
'order_id': order_data['order_id'],
'amount': order_data['total_amount'],
'payment_method': order_data['payment_method']
}
)
# 트랜잭션 커밋
self.producer.commit_transaction()
print(f"✅ 주문 {order_data['order_id']} 트랜잭션 성공")
return True
except Exception as e:
# 트랜잭션 롤백
self.producer.abort_transaction()
print(f"❌ 주문 {order_data['order_id']} 트랜잭션 실패: {e}")
return False
def close(self):
self.producer.close()
# 사용 예시
tx_producer = TransactionalProducer('order-processor-1')
order = {
'order_id': 'order_12345',
'user_id': 'user_001',
'items': [
{'product_id': 'prod_001', 'quantity': 2, 'price': 15000},
{'product_id': 'prod_002', 'quantity': 1, 'price': 25000}
],
'total_amount': 55000,
'payment_method': 'card'
}
success = tx_producer.process_order_transaction(order)
tx_producer.close()
Consumer 고급 패턴
수동 오프셋 관리
from kafka import KafkaConsumer, TopicPartition
import json
import time
class ManualOffsetConsumer:
def __init__(self, topics, group_id):
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=['localhost:9092'],
group_id=group_id,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None,
enable_auto_commit=False, # 자동 커밋 비활성화
auto_offset_reset='earliest'
)
self.processed_count = 0
self.commit_interval = 100 # 100개마다 커밋
def process_message_safely(self, message):
"""안전한 메시지 처리"""
try:
# 비즈니스 로직 수행
data = message.value
print(f"처리 중: {data.get('user_id', 'unknown')} - {data.get('action', 'unknown')}")
# 시뮬레이션: 가끔 실패
if data.get('action') == 'error_test':
raise Exception("시뮬레이션된 오류")
# 처리 성공
time.sleep(0.1) # 처리 시간 시뮬레이션
return True
except Exception as e:
print(f"❌ 메시지 처리 실패: {e}")
return False
def consume_with_manual_commit(self):
"""수동 오프셋 관리로 메시지 소비"""
try:
for message in self.consumer:
success = self.process_message_safely(message)
if success:
self.processed_count += 1
# 주기적으로 오프셋 커밋
if self.processed_count % self.commit_interval == 0:
self.consumer.commit()
print(f"📍 오프셋 커밋: {self.processed_count}개 처리 완료")
else:
# 실패한 메시지 처리 전략
print("⚠️ 메시지 처리 실패 - Dead Letter Queue로 전송")
self.send_to_dlq(message)
# 실패해도 오프셋은 진행 (무한 재시도 방지)
self.consumer.commit()
except KeyboardInterrupt:
print("\n🛑 Consumer 중지 중...")
finally:
# 마지막 오프셋 커밋
self.consumer.commit()
self.consumer.close()
print(f"📊 총 {self.processed_count}개 메시지 처리 완료")
def send_to_dlq(self, failed_message):
"""실패한 메시지를 Dead Letter Queue로 전송"""
# 실제 구현에서는 별도 Producer로 DLQ에 전송
print(f"📨 DLQ 전송: {failed_message.topic}-{failed_message.partition}-{failed_message.offset}")
# 사용 예시
consumer = ManualOffsetConsumer(['user-activities'], 'manual-commit-group')
consumer.consume_with_manual_commit()
다중 스레드 Consumer 패턴
import threading
from queue import Queue
from kafka import KafkaConsumer
import json
import time
class MultiThreadConsumer:
def __init__(self, topics, group_id, num_workers=3):
self.topics = topics
self.group_id = group_id
self.num_workers = num_workers
self.message_queue = Queue(maxsize=1000)
self.workers = []
self.running = True
# Consumer 설정
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=['localhost:9092'],
group_id=group_id,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True,
max_poll_records=100 # 한 번에 가져올 레코드 수
)
def message_fetcher(self):
"""메시지를 가져와서 큐에 넣는 스레드"""
try:
for message in self.consumer:
if not self.running:
break
# 큐에 메시지 추가 (블로킹)
self.message_queue.put(message)
except Exception as e:
print(f"❌ Fetcher 오류: {e}")
finally:
self.consumer.close()
def message_worker(self, worker_id):
"""메시지를 처리하는 워커 스레드"""
processed = 0
while self.running:
try:
# 큐에서 메시지 가져오기 (타임아웃 설정)
message = self.message_queue.get(timeout=1)
# 메시지 처리
self.process_message(message, worker_id)
processed += 1
# 큐 작업 완료 표시
self.message_queue.task_done()
except Exception as e:
if self.running: # 정상 종료가 아닌 경우만 에러 출력
print(f"❌ Worker {worker_id} 오류: {e}")
print(f"🏁 Worker {worker_id} 종료 - {processed}개 처리")
def process_message(self, message, worker_id):
"""실제 메시지 처리 로직"""
data = message.value
# 처리 시간 시뮬레이션
processing_time = 0.1 + (hash(data.get('user_id', '')) % 3) * 0.1
time.sleep(processing_time)
print(f"🔄 Worker-{worker_id}: {data.get('user_id')} - {data.get('action')}")
def start(self):
"""Consumer 시작"""
print(f"🚀 Multi-thread Consumer 시작 (워커: {self.num_workers}개)")
# Fetcher 스레드 시작
fetcher = threading.Thread(target=self.message_fetcher)
fetcher.start()
# Worker 스레드들 시작
for i in range(self.num_workers):
worker = threading.Thread(target=self.message_worker, args=(i+1,))
worker.start()
self.workers.append(worker)
try:
# 메인 스레드에서 상태 모니터링
while True:
queue_size = self.message_queue.qsize()
print(f"📊 큐 크기: {queue_size}")
time.sleep(5)
except KeyboardInterrupt:
print("\n🛑 종료 신호 수신...")
self.stop()
def stop(self):
"""Consumer 중지"""
self.running = False
# 모든 워커 스레드 종료 대기
for worker in self.workers:
worker.join()
print("✅ 모든 워커 스레드 종료 완료")
# 사용 예시
if __name__ == "__main__":
multi_consumer = MultiThreadConsumer(
topics=['user-activities'],
group_id='multi-worker-group',
num_workers=5
)
multi_consumer.start()
Kafka Streams 입문
기본 스트림 처리
# Kafka Streams는 주로 Java/Scala에서 사용되므로,
# Python에서는 kafka-python으로 유사한 패턴 구현
from kafka import KafkaConsumer, KafkaProducer
from collections import defaultdict, deque
import json
import time
from datetime import datetime, timedelta
class SimpleStreamProcessor:
def __init__(self):
# Input consumer
self.consumer = KafkaConsumer(
'user-activities',
bootstrap_servers=['localhost:9092'],
group_id='stream-processor',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='latest'
)
# Output producer
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8')
)
# 상태 저장소 (실제로는 RocksDB 등 사용)
self.user_sessions = defaultdict(dict)
self.user_activity_counts = defaultdict(int)
self.window_data = defaultdict(lambda: deque(maxlen=100))
def process_stream(self):
"""스트림 처리 메인 루프"""
print("🌊 Stream Processing 시작...")
for message in self.consumer:
try:
self.process_user_activity(message.value)
except Exception as e:
print(f"❌ 스트림 처리 오류: {e}")
def process_user_activity(self, activity):
"""사용자 활동 스트림 처리"""
user_id = activity['user_id']
action = activity['action']
timestamp = activity.get('timestamp', int(time.time() * 1000))
# 1. 사용자 세션 업데이트
self.update_user_session(user_id, action, timestamp)
# 2. 활동 카운트 업데이트
self.update_activity_count(user_id, action)
# 3. 윈도우 기반 집계
self.update_window_aggregation(user_id, action, timestamp)
# 4. 실시간 알림 체크
self.check_real_time_alerts(user_id, action, timestamp)
def update_user_session(self, user_id, action, timestamp):
"""사용자 세션 추적"""
session = self.user_sessions[user_id]
if 'start_time' not in session or action == '로그인':
session['start_time'] = timestamp
session['last_activity'] = timestamp
session['action_count'] = 1
print(f"🔵 세션 시작: {user_id}")
else:
# 30분 이상 비활성 시 새 세션으로 간주
if timestamp - session['last_activity'] > 30 * 60 * 1000:
self.end_session(user_id, session)
session['start_time'] = timestamp
session['action_count'] = 1
else:
session['action_count'] += 1
session['last_activity'] = timestamp
# 로그아웃 시 세션 종료
if action == '로그아웃':
self.end_session(user_id, session)
def end_session(self, user_id, session):
"""세션 종료 처리"""
duration = session['last_activity'] - session['start_time']
session_summary = {
'user_id': user_id,
'session_duration_ms': duration,
'action_count': session['action_count'],
'start_time': session['start_time'],
'end_time': session['last_activity']
}
# 세션 요약을 output 토픽으로 전송
self.producer.send('user-sessions', value=session_summary)
print(f"🔴 세션 종료: {user_id} ({duration/1000:.1f}초)")
# 세션 데이터 초기화
self.user_sessions[user_id] = {}
def update_activity_count(self, user_id, action):
"""활동 카운트 업데이트"""
key = f"{user_id}:{action}"
self.user_activity_counts[key] += 1
# 10회마다 통계 전송
if self.user_activity_counts[key] % 10 == 0:
stats = {
'user_id': user_id,
'action': action,
'count': self.user_activity_counts[key],
'timestamp': int(time.time() * 1000)
}
self.producer.send('user-activity-stats', value=stats)
def update_window_aggregation(self, user_id, action, timestamp):
"""5분 윈도우 집계"""
window_key = timestamp // (5 * 60 * 1000) # 5분 윈도우
window_data = self.window_data[window_key]
window_data.append({
'user_id': user_id,
'action': action,
'timestamp': timestamp
})
# 윈도우가 완성되면 결과 전송
current_time = int(time.time() * 1000)
if current_time > (window_key + 1) * 5 * 60 * 1000:
self.emit_window_result(window_key, list(window_data))
def emit_window_result(self, window_key, activities):
"""윈도우 집계 결과 전송"""
if not activities:
return
# 집계 계산
action_counts = defaultdict(int)
unique_users = set()
for activity in activities:
action_counts[activity['action']] += 1
unique_users.add(activity['user_id'])
window_result = {
'window_start': window_key * 5 * 60 * 1000,
'window_end': (window_key + 1) * 5 * 60 * 1000,
'total_activities': len(activities),
'unique_users': len(unique_users),
'action_breakdown': dict(action_counts)
}
self.producer.send('window-aggregates', value=window_result)
print(f"📊 윈도우 집계: {len(activities)}개 활동, {len(unique_users)}명 사용자")
def check_real_time_alerts(self, user_id, action, timestamp):
"""실시간 알림 체크"""
# 예: 5분 내 동일 사용자의 로그인 시도가 5회 이상
if action == '로그인':
recent_logins = [
activity for activity in self.window_data[timestamp // (5 * 60 * 1000)]
if (activity['user_id'] == user_id and
activity['action'] == '로그인' and
timestamp - activity['timestamp'] < 5 * 60 * 1000)
]
if len(recent_logins) >= 5:
alert = {
'alert_type': 'suspicious_login',
'user_id': user_id,
'login_attempts': len(recent_logins),
'timestamp': timestamp
}
self.producer.send('security-alerts', value=alert)
print(f"🚨 보안 알림: {user_id} 연속 로그인 시도")
# 사용 예시
if __name__ == "__main__":
processor = SimpleStreamProcessor()
processor.process_stream()
실시간 분석 시스템 구축
대시보드용 실시간 데이터 준비
from kafka import KafkaConsumer, KafkaProducer
import json
import time
from collections import defaultdict, deque
from datetime import datetime
import threading
class RealTimeDashboard:
def __init__(self):
# 실시간 지표 저장
self.metrics = {
'current_users': set(),
'actions_per_minute': deque(maxlen=60), # 최근 60분
'top_products': defaultdict(int),
'revenue_per_minute': deque(maxlen=60),
'error_count': 0,
'last_updated': time.time()
}
# Consumer 설정
self.consumer = KafkaConsumer(
'user-activities',
'order-events',
'error-events',
bootstrap_servers=['localhost:9092'],
group_id='dashboard-processor',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='latest'
)
# 대시보드 업데이트용 Producer
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8')
)
# 메트릭 초기화
self.init_metrics()
# 주기적 업데이트 스레드 시작
self.update_thread = threading.Thread(target=self.periodic_update)
self.update_thread.daemon = True
self.update_thread.start()
def init_metrics(self):
"""메트릭 초기화"""
current_time = int(time.time())
# 분당 액션 수 초기화 (0으로)
for i in range(60):
self.metrics['actions_per_minute'].append({
'timestamp': current_time - (59 - i) * 60,
'count': 0
})
# 분당 매출 초기화
for i in range(60):
self.metrics['revenue_per_minute'].append({
'timestamp': current_time - (59 - i) * 60,
'amount': 0
})
def process_events(self):
"""이벤트 처리 메인 루프"""
print("📊 실시간 대시보드 프로세서 시작...")
for message in self.consumer:
topic = message.topic
data = message.value
try:
if topic == 'user-activities':
self.process_user_activity(data)
elif topic == 'order-events':
self.process_order_event(data)
elif topic == 'error-events':
self.process_error_event(data)
except Exception as e:
print(f"❌ 이벤트 처리 오류: {e}")
def process_user_activity(self, activity):
"""사용자 활동 처리"""
user_id = activity.get('user_id')
action = activity.get('action')
# 현재 활성 사용자 추가
if user_id:
self.metrics['current_users'].add(user_id)
# 분당 액션 수 업데이트
current_minute = int(time.time() // 60) * 60
if self.metrics['actions_per_minute'][-1]['timestamp'] == current_minute:
self.metrics['actions_per_minute'][-1]['count'] += 1
else:
# 새로운 분 시작
self.metrics['actions_per_minute'].append({
'timestamp': current_minute,
'count': 1
})
# 상품 조회 추적
if action == '상품조회' and 'product_id' in activity:
self.metrics['top_products'][activity['product_id']] += 1
def process_order_event(self, order):
"""주문 이벤트 처리"""
if order.get('type') == 'order_completed':
amount = order.get('total_amount', 0)
# 분당 매출 업데이트
current_minute = int(time.time() // 60) * 60
if self.metrics['revenue_per_minute'][-1]['timestamp'] == current_minute:
self.metrics['revenue_per_minute'][-1]['amount'] += amount
else:
# 새로운 분 시작
self.metrics['revenue_per_minute'].append({
'timestamp': current_minute,
'amount': amount
})
def process_error_event(self, error):
"""에러 이벤트 처리"""
self.metrics['error_count'] += 1
def periodic_update(self):
"""주기적 대시보드 업데이트"""
while True:
try:
time.sleep(10) # 10초마다 업데이트
self.publish_dashboard_update()
self.cleanup_old_data()
except Exception as e:
print(f"❌ 주기적 업데이트 오류: {e}")
def publish_dashboard_update(self):
"""대시보드 업데이트 발송"""
# 현재 지표 계산
current_time = int(time.time())
# 최근 1분간 액션 수
recent_actions = sum(
item['count'] for item in list(self.metrics['actions_per_minute'])[-1:]
)
# 최근 1분간 매출
recent_revenue = sum(
item['amount'] for item in list(self.metrics['revenue_per_minute'])[-1:]
)
# 인기 상품 TOP 5
top_products = sorted(
self.metrics['top_products'].items(),
key=lambda x: x[1],
reverse=True
)[:5]
# 대시보드 데이터 구성
dashboard_data = {
'timestamp': current_time,
'current_active_users': len(self.metrics['current_users']),
'actions_last_minute': recent_actions,
'revenue_last_minute': recent_revenue,
'total_errors': self.metrics['error_count'],
'top_products': top_products,
'actions_trend': list(self.metrics['actions_per_minute'])[-10:], # 최근 10분
'revenue_trend': list(self.metrics['revenue_per_minute'])[-10:] # 최근 10분
}
# 대시보드 토픽으로 전송
self.producer.send('dashboard-updates', value=dashboard_data)
# 콘솔 출력
print(f"📊 대시보드 업데이트: 활성사용자={len(self.metrics['current_users'])}, "
f"분당액션={recent_actions}, 분당매출={recent_revenue:,}원")
def cleanup_old_data(self):
"""오래된 데이터 정리"""
current_time = time.time()
# 10분 이상 비활성 사용자 제거
active_users = set()
for user in self.metrics['current_users']:
# 실제로는 사용자별 마지막 활동 시간을 추적해야 함
# 여기서는 간단히 5분마다 모든 사용자 목록 초기화
if int(current_time) % 300 == 0: # 5분마다
continue
active_users.add(user)
self.metrics['current_users'] = active_users
# 상품 조회 수 주기적 리셋 (1시간마다)
if int(current_time) % 3600 == 0: # 1시간마다
self.metrics['top_products'] = defaultdict(int)
# 사용 예시
if __name__ == "__main__":
dashboard = RealTimeDashboard()
dashboard.process_events()
성능 최적화 기법
Producer 성능 튜닝
# 고성능 Producer 설정 예시
high_performance_config = {
'bootstrap_servers': ['localhost:9092'],
'acks': 1, # 리더만 확인 (속도 우선)
'retries': 3,
'batch_size': 65536, # 64KB 배치
'linger_ms': 5, # 5ms 대기
'compression_type': 'lz4', # 빠른 압축
'buffer_memory': 134217728, # 128MB 버퍼
'max_in_flight_requests_per_connection': 5 # 동시 요청 증가
}
# 안정성 우선 Producer 설정 예시
reliability_config = {
'bootstrap_servers': ['localhost:9092'],
'acks': 'all', # 모든 복제본 확인
'retries': 10,
'enable_idempotence': True,
'max_in_flight_requests_per_connection': 1, # 순서 보장
'compression_type': 'snappy',
'batch_size': 16384,
'linger_ms': 10
}
Consumer 성능 튜닝
# 고성능 Consumer 설정
high_throughput_consumer_config = {
'bootstrap_servers': ['localhost:9092'],
'fetch_min_bytes': 50000, # 최소 50KB
'fetch_max_wait_ms': 500, # 최대 0.5초 대기
'max_partition_fetch_bytes': 10485760, # 10MB
'max_poll_records': 5000, # 한 번에 많은 레코드
'session_timeout_ms': 30000,
'heartbeat_interval_ms': 10000
}
이번 편에서는 Kafka의 고급 기능들을 살펴봤습니다:
- Producer 고급 기능: 커스텀 파티셔너, 배치 최적화, 트랜잭션
- Consumer 고급 패턴: 수동 오프셋 관리, 다중 스레드 처리
- Kafka Streams: 실시간 스트림 처리와 상태 관리
- 실시간 분석: 대시보드용 실시간 지표 생성
- 성능 최적화: 처리량과 안정성 균형
3편에서는 프로덕션 환경을 위한 고급 운영 기법을 다룹니다:
- 클러스터링과 고가용성 구성
- 모니터링과 성능 튜닝
- 보안 설정과 운영 자동화
- 장애 대응과 복구 전략
실제 프로덕션에서는 비즈니스 요구사항에 맞는 적절한 설정과 패턴을 선택하는 것이 중요합니다. 성능과 안정성 사이의 균형을 잘 맞춰보세요.
'Data Platform' 카테고리의 다른 글
| RabbitMQ 완벽 가이드 3편: 프로덕션 운영과 고급 기능 (3) | 2025.07.23 |
|---|---|
| RabbitMQ 완벽 가이드 2편: Exchange와 라우팅 패턴 (0) | 2025.07.23 |
| RabbitMQ 완벽 가이드 1편: 기초 개념과 설치 (0) | 2025.07.23 |
| Kafka 프로덕션 가이드 - 클러스터링, 모니터링, 보안, 운영 자동화 (1) | 2025.07.23 |
| Apache Kafka 기초 가이드 - 분산 스트리밍 플랫폼 개념과 설치 (2) | 2025.07.23 |