hobokai 님의 블로그

Redis 완전 마스터 가이드: 캐싱부터 실시간 애플리케이션까지 고성능 데이터 솔루션 본문

Backend

Redis 완전 마스터 가이드: 캐싱부터 실시간 애플리케이션까지 고성능 데이터 솔루션

hobokai 2025. 7. 23. 08:52

목차

  1. Redis란 무엇인가?
  2. Redis 설치 및 기본 설정
  3. 데이터 타입과 기본 명령어
  4. 고급 데이터 구조
  5. 캐싱 전략과 패턴
  6. Redis 영속성
  7. 보안과 인증
  8. 성능 최적화
  9. Redis 클러스터
  10. 고가용성과 센티널
  11. 모니터링과 디버깅
  12. Redis 모듈과 확장
  13. 실전 사용 사례
  14. 프로그래밍 언어별 연동
  15. 운영 베스트 프랙티스
  16. 결론

Redis란 무엇인가?

Redis(Remote Dictionary Server)는 2009년 Salvatore Sanfilippo가 개발한 오픈소스 인메모리 데이터 구조 저장소입니다. 데이터베이스, 캐시, 메시지 브로커로 사용되며, 뛰어난 성능과 다양한 데이터 타입을 지원합니다.

Redis의 핵심 특징

  • 고성능: 모든 데이터를 메모리에 저장하여 초고속 읽기/쓰기
  • 🎯 다양한 데이터 타입: String, Hash, List, Set, Sorted Set 등
  • 🔄 원자적 연산: 모든 명령어가 원자적으로 실행
  • 📦 영속성: RDB, AOF를 통한 데이터 영속화
  • 🌐 복제와 클러스터: 마스터-슬레이브 복제, 클러스터 지원
  • 🚀 Pub/Sub: 실시간 메시징 시스템
  • 🔧 Lua 스크립팅: 복잡한 로직의 원자적 실행

Redis vs 다른 솔루션

특성 Redis Memcached MySQL MongoDB
데이터 위치 메모리 메모리 디스크 디스크
데이터 타입 다양함 Key-Value만 관계형 문서형
영속성 선택적 없음 있음 있음
복제 지원 없음 지원 지원
성능 매우 높음 높음 보통 보통
메모리 사용 높음 높음 낮음 보통

Redis 사용 사례

  • 🏪 캐싱: 웹 애플리케이션 캐시, API 응답 캐시
  • 🔐 세션 저장소: 분산 환경에서 세션 관리
  • 📊 실시간 분석: 조회수, 좋아요 수 등 실시간 카운터
  • 🎮 게임: 리더보드, 실시간 게임 상태
  • 💬 채팅: 실시간 메시징, 알림 시스템
  • 🛒 장바구니: 전자상거래 장바구니 임시 저장
  • 🔒 분산 락: 동시성 제어를 위한 락 메커니즘

Redis 설치 및 기본 설정

Docker를 이용한 설치

# Redis 컨테이너 실행
docker run -d \
  --name redis \
  -p 6379:6379 \
  redis:7-alpine \
  redis-server --appendonly yes

# Redis CLI 접속
docker exec -it redis redis-cli

# 설정 파일과 함께 실행
docker run -d \
  --name redis \
  -p 6379:6379 \
  -v $(pwd)/redis.conf:/usr/local/etc/redis/redis.conf \
  redis:7-alpine \
  redis-server /usr/local/etc/redis/redis.conf

직접 설치

Linux (Ubuntu/Debian)

# 공식 저장소 추가
curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list

# Redis 설치
sudo apt-get update
sudo apt-get install redis

# 서비스 시작
sudo systemctl start redis-server
sudo systemctl enable redis-server

macOS

# Homebrew 사용
brew install redis

# 서비스 시작
brew services start redis

# 또는 포어그라운드 실행
redis-server

Windows

# WSL2 사용 (권장)
# 또는 Redis for Windows (Microsoft 포크)
# https://github.com/microsoftarchive/redis

# Chocolatey 사용
choco install redis-64

기본 설정 파일

# redis.conf
# 네트워크 설정
bind 127.0.0.1 0.0.0.0
port 6379
protected-mode yes

# 메모리 설정
maxmemory 2gb
maxmemory-policy allkeys-lru

# 영속성 설정
save 900 1      # 900초 동안 1개 이상 변경 시 저장
save 300 10     # 300초 동안 10개 이상 변경 시 저장
save 60 10000   # 60초 동안 10000개 이상 변경 시 저장

# AOF 설정
appendonly yes
appendfilename "appendonly.aof"
appendfsync everysec

# 로그 설정
loglevel notice
logfile /var/log/redis/redis-server.log

# 보안 설정
requirepass your_strong_password

# 클라이언트 연결 설정
timeout 300
tcp-keepalive 300
tcp-backlog 511
maxclients 10000

기본 접속 테스트

# Redis CLI 접속
redis-cli

# 인증이 필요한 경우
redis-cli -a your_password

# 원격 서버 접속
redis-cli -h hostname -p 6379 -a password

# 기본 테스트
127.0.0.1:6379> ping
PONG

127.0.0.1:6379> set test "Hello Redis"
OK

127.0.0.1:6379> get test
"Hello Redis"

127.0.0.1:6379> info server
# 서버 정보 출력

데이터 타입과 기본 명령어

String (문자열)

Redis의 가장 기본적인 데이터 타입으로, 최대 512MB까지 저장 가능합니다.

# 기본 문자열 연산
SET key value                 # 값 설정
GET key                       # 값 조회
MSET key1 val1 key2 val2     # 여러 값 한번에 설정
MGET key1 key2               # 여러 값 한번에 조회
DEL key                      # 키 삭제
EXISTS key                   # 키 존재 확인

# 만료 시간 설정
SET key value EX 3600        # 3600초 후 만료
SETEX key 3600 value         # 위와 동일
TTL key                      # 남은 시간 확인
EXPIRE key 3600              # 기존 키에 만료 시간 설정

# 숫자 연산
SET counter 0
INCR counter                 # 1 증가
INCRBY counter 5             # 5 증가
DECR counter                 # 1 감소
DECRBY counter 3             # 3 감소

# 문자열 조작
SET name "Redis"
APPEND name " Database"      # 문자열 추가
STRLEN name                  # 문자열 길이
GETRANGE name 0 4           # 부분 문자열 조회

Hash (해시)

필드-값 쌍의 컬렉션으로, 객체를 저장하기에 적합합니다.

# 해시 기본 연산
HSET user:1000 name "John" email "john@example.com" age 30
HGET user:1000 name          # "John"
HMGET user:1000 name email   # ["John", "john@example.com"]
HGETALL user:1000           # 모든 필드-값 쌍 조회

# 해시 조작
HDEL user:1000 age          # 필드 삭제
HEXISTS user:1000 name      # 필드 존재 확인
HKEYS user:1000             # 모든 필드명 조회
HVALS user:1000             # 모든 값 조회
HLEN user:1000              # 필드 개수

# 숫자 필드 연산
HINCRBY user:1000 age 1     # age 필드 1 증가
HINCRBYFLOAT user:1000 score 1.5  # 실수 증가

List (리스트)

순서가 있는 문자열 컬렉션으로, 큐나 스택으로 사용할 수 있습니다.

# 리스트 기본 연산
LPUSH mylist "world"        # 왼쪽에 추가
LPUSH mylist "hello"        # ["hello", "world"]
RPUSH mylist "!"            # 오른쪽에 추가 ["hello", "world", "!"]

LRANGE mylist 0 -1          # 전체 리스트 조회
LLEN mylist                 # 리스트 길이

# 큐 (FIFO) 구현
RPUSH queue "task1"
RPUSH queue "task2"
LPOP queue                  # "task1" 반환

# 스택 (LIFO) 구현
LPUSH stack "item1"
LPUSH stack "item2"
LPOP stack                  # "item2" 반환

# 블로킹 연산 (큐 대기)
BLPOP queue 30              # 30초까지 대기하며 pop
BRPOP queue 30              # 30초까지 대기하며 pop

# 리스트 조작
LSET mylist 0 "hi"          # 인덱스 0 값 변경
LINSERT mylist BEFORE "world" "beautiful"  # "world" 앞에 삽입
LTRIM mylist 0 2            # 인덱스 0-2만 유지

Set (집합)

중복되지 않는 문자열의 순서 없는 컬렉션입니다.

# 집합 기본 연산
SADD myset "hello"          # 멤버 추가
SADD myset "world"
SADD myset "hello"          # 중복은 무시됨

SMEMBERS myset              # 모든 멤버 조회
SCARD myset                 # 멤버 개수
SISMEMBER myset "hello"     # 멤버 존재 확인

# 집합 연산
SADD set1 "a" "b" "c"
SADD set2 "b" "c" "d"

SUNION set1 set2            # 합집합
SINTER set1 set2            # 교집합
SDIFF set1 set2             # 차집합

# 랜덤 연산
SRANDMEMBER myset           # 랜덤 멤버 조회
SPOP myset                  # 랜덤 멤버 제거 후 반환

Sorted Set (정렬된 집합)

스코어를 가진 멤버들의 정렬된 컬렉션입니다.

# 정렬된 집합 기본 연산
ZADD leaderboard 100 "alice"    # 스코어 100으로 alice 추가
ZADD leaderboard 150 "bob"
ZADD leaderboard 120 "charlie"

ZRANGE leaderboard 0 -1         # 스코어 순으로 조회
ZREVRANGE leaderboard 0 -1      # 스코어 역순으로 조회
ZRANGE leaderboard 0 -1 WITHSCORES  # 스코어와 함께 조회

# 순위와 스코어 조회
ZRANK leaderboard "alice"       # alice의 순위 (0부터)
ZREVRANK leaderboard "alice"    # 역순 순위
ZSCORE leaderboard "alice"      # alice의 스코어

# 범위 조회
ZRANGEBYSCORE leaderboard 100 150      # 스코어 100-150 범위
ZCOUNT leaderboard 100 150             # 범위 내 멤버 수

# 스코어 연산
ZINCRBY leaderboard 10 "alice"  # alice 스코어에 10 추가

기본 키 관리 명령어

# 키 관리
KEYS pattern                # 패턴에 맞는 키 조회 (운영 시 주의!)
SCAN cursor [MATCH pattern] # 안전한 키 스캔
EXISTS key                  # 키 존재 확인
TYPE key                    # 키의 데이터 타입 확인
RENAME oldkey newkey        # 키 이름 변경

# 만료 시간 관리
EXPIRE key seconds          # 만료 시간 설정
TTL key                     # 남은 시간 확인 (-1: 영구, -2: 없음)
PERSIST key                 # 만료 시간 제거

# 데이터베이스 관리
SELECT 0                    # 데이터베이스 선택 (0-15)
FLUSHDB                     # 현재 DB 모든 키 삭제
FLUSHALL                    # 모든 DB 삭제

고급 데이터 구조

Streams

Redis 5.0에서 도입된 로그형 데이터 구조로, 이벤트 스트리밍에 최적화되어 있습니다.

# 스트림 기본 연산
XADD mystream * sensor-id 1234 temperature 19.8 humidity 67.2
# 자동 생성된 ID: 1609459200000-0

XADD mystream 1609459200001-0 sensor-id 1235 temperature 20.1
# 명시적 ID 지정

# 스트림 읽기
XRANGE mystream - +             # 전체 범위 읽기
XRANGE mystream 1609459200000-0 1609459200001-0  # 특정 범위

# 실시간 읽기
XREAD COUNT 2 STREAMS mystream 0-0  # 처음부터 2개 읽기
XREAD BLOCK 5000 STREAMS mystream $  # 새 메시지 대기 (5초)

# 컨슈머 그룹
XGROUP CREATE mystream mygroup 0-0   # 컨슈머 그룹 생성
XREADGROUP GROUP mygroup alice COUNT 1 STREAMS mystream >

HyperLogLog

확률적 자료구조로, 대용량 데이터의 고유 원소 개수를 추정합니다.

# HyperLogLog 기본 연산
PFADD hll a b c d e f g      # 원소 추가
PFCOUNT hll                  # 고유 원소 개수 추정
PFADD hll h i j
PFCOUNT hll

# 여러 HLL 병합
PFADD hll1 a b c
PFADD hll2 d e f
PFMERGE hll3 hll1 hll2      # hll1과 hll2를 병합하여 hll3 생성
PFCOUNT hll3

Bitmaps

비트 단위 연산을 지원하는 문자열의 특수한 형태입니다.

# 비트맵 기본 연산
SETBIT user:1000:login 0 1   # 0번째 비트를 1로 설정
SETBIT user:1000:login 1 1   # 1번째 비트를 1로 설정
GETBIT user:1000:login 0     # 0번째 비트 조회

# 비트 연산
BITCOUNT user:1000:login     # 1로 설정된 비트 개수
BITPOS user:1000:login 1     # 첫 번째 1 비트 위치

# 일별 로그인 추적 예시
SETBIT daily_login:2023-12-01 1000 1  # 사용자 1000 로그인
SETBIT daily_login:2023-12-01 1001 1  # 사용자 1001 로그인
BITCOUNT daily_login:2023-12-01        # 일일 로그인 사용자 수

Geospatial

지리적 위치 데이터를 저장하고 조회할 수 있습니다.

# 지리적 위치 추가
GEOADD places 126.9780 37.5665 "Seoul"
GEOADD places 139.6917 35.6895 "Tokyo"
GEOADD places 121.5654 25.0330 "Taipei"

# 거리 계산
GEODIST places Seoul Tokyo km    # 서울-도쿄 간 거리 (km)

# 반경 내 위치 검색
GEORADIUS places 127 37 100 km WITHDIST WITHCOORD
# (127, 37) 기준 100km 내 위치들을 거리, 좌표와 함께 조회

# 멤버 기준 반경 검색
GEORADIUSBYMEMBER places Seoul 500 km WITHDIST

캐싱 전략과 패턴

캐시 패턴

1. Cache-Aside (Lazy Loading)

import redis
import json

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def get_user(user_id):
    # 1. 캐시에서 먼저 조회
    cache_key = f"user:{user_id}"
    cached_user = redis_client.get(cache_key)

    if cached_user:
        return json.loads(cached_user)

    # 2. 캐시에 없으면 DB에서 조회
    user = fetch_user_from_db(user_id)

    # 3. 캐시에 저장 (1시간 TTL)
    redis_client.setex(
        cache_key, 
        3600, 
        json.dumps(user)
    )

    return user

def update_user(user_id, user_data):
    # 1. DB 업데이트
    update_user_in_db(user_id, user_data)

    # 2. 캐시 무효화
    cache_key = f"user:{user_id}"
    redis_client.delete(cache_key)

2. Write-Through

def update_user_write_through(user_id, user_data):
    # 1. DB 업데이트
    update_user_in_db(user_id, user_data)

    # 2. 캐시도 동시에 업데이트
    cache_key = f"user:{user_id}"
    redis_client.setex(
        cache_key, 
        3600, 
        json.dumps(user_data)
    )

3. Write-Behind (Write-Back)

import asyncio
from collections import defaultdict

# 쓰기 버퍼
write_buffer = defaultdict(dict)

async def update_user_write_behind(user_id, user_data):
    # 1. 캐시 즉시 업데이트
    cache_key = f"user:{user_id}"
    redis_client.setex(
        cache_key, 
        3600, 
        json.dumps(user_data)
    )

    # 2. 쓰기 버퍼에 추가
    write_buffer[user_id].update(user_data)

    # 3. 비동기로 DB에 반영 (배치 처리)
    asyncio.create_task(flush_to_db_later(user_id))

async def flush_to_db_later(user_id):
    await asyncio.sleep(5)  # 5초 후 DB에 반영
    if user_id in write_buffer:
        data = write_buffer.pop(user_id)
        update_user_in_db(user_id, data)

캐시 만료 정책

Redis 메모리 정책 설정

# redis.conf 또는 런타임 설정
CONFIG SET maxmemory-policy allkeys-lru

# 정책 옵션들:
# noeviction: 메모리 부족 시 에러 반환
# allkeys-lru: 모든 키 중 LRU로 제거
# volatile-lru: TTL이 설정된 키 중 LRU로 제거
# allkeys-random: 모든 키 중 랜덤 제거
# volatile-random: TTL이 설정된 키 중 랜덤 제거
# volatile-ttl: TTL이 가장 짧은 키 제거

TTL 기반 캐시 관리

import time

class CacheManager:
    def __init__(self, redis_client):
        self.redis = redis_client

    def set_with_ttl(self, key, value, ttl_seconds):
        """TTL과 함께 캐시 설정"""
        self.redis.setex(key, ttl_seconds, value)

    def refresh_ttl(self, key, ttl_seconds):
        """TTL 갱신"""
        if self.redis.exists(key):
            self.redis.expire(key, ttl_seconds)

    def get_with_refresh(self, key, ttl_seconds):
        """조회 시 TTL 갱신"""
        value = self.redis.get(key)
        if value:
            self.refresh_ttl(key, ttl_seconds)
        return value

분산 캐시 전략

캐시 워밍

def warm_cache():
    """서버 시작 시 자주 사용되는 데이터 미리 로드"""
    popular_users = get_popular_users()

    pipeline = redis_client.pipeline()
    for user in popular_users:
        cache_key = f"user:{user['id']}"
        pipeline.setex(
            cache_key, 
            3600, 
            json.dumps(user)
        )

    pipeline.execute()
    print("Cache warmed up successfully")

캐시 무효화 전략

class CacheInvalidator:
    def __init__(self, redis_client):
        self.redis = redis_client

    def invalidate_user_cache(self, user_id):
        """사용자 관련 모든 캐시 무효화"""
        patterns = [
            f"user:{user_id}",
            f"user:{user_id}:profile",
            f"user:{user_id}:posts:*",
            f"followers:{user_id}",
        ]

        for pattern in patterns:
            if '*' in pattern:
                # 패턴 매칭 키들 삭제
                keys = self.redis.keys(pattern)
                if keys:
                    self.redis.delete(*keys)
            else:
                self.redis.delete(pattern)

    def invalidate_by_tags(self, tags):
        """태그 기반 캐시 무효화"""
        for tag in tags:
            tagged_keys = self.redis.smembers(f"tag:{tag}")
            if tagged_keys:
                self.redis.delete(*tagged_keys)
                self.redis.delete(f"tag:{tag}")

Redis 영속성

RDB (Redis Database)

특정 시점의 메모리 스냅샷을 디스크에 저장하는 방식입니다.

# redis.conf 설정
save 900 1      # 900초 동안 1개 이상 변경
save 300 10     # 300초 동안 10개 이상 변경
save 60 10000   # 60초 동안 10000개 이상 변경

stop-writes-on-bgsave-error yes  # 백그라운드 저장 실패 시 쓰기 중단
rdbcompression yes               # RDB 파일 압축
rdbchecksum yes                  # 체크섬 검증
dbfilename dump.rdb              # RDB 파일명
dir ./                           # 저장 디렉토리

RDB 수동 백업

# 즉시 백업 생성 (블로킹)
SAVE

# 백그라운드 백업 생성 (논블로킹)
BGSAVE

# 마지막 저장 시간 확인
LASTSAVE

# 백업 파일 확인
CONFIG GET dir
CONFIG GET dbfilename

AOF (Append Only File)

모든 쓰기 명령어를 로그로 기록하는 방식입니다.

# redis.conf 설정
appendonly yes                    # AOF 활성화
appendfilename "appendonly.aof"   # AOF 파일명

# 동기화 정책
appendfsync always     # 모든 명령어마다 동기화 (안전하지만 느림)
appendfsync everysec   # 1초마다 동기화 (권장)
appendfsync no         # OS에 맡김 (빠르지만 위험)

# AOF 재작성 설정
auto-aof-rewrite-percentage 100   # 100% 증가 시 재작성
auto-aof-rewrite-min-size 64mb    # 최소 64MB부터 재작성

AOF 관리 명령어

# AOF 재작성 (백그라운드)
BGREWRITEAOF

# AOF 로드 상태 확인
CONFIG GET appendonly

# AOF 파일 크기 확인
INFO persistence

하이브리드 영속성

# RDB + AOF 혼합 사용
save 900 1
appendonly yes
aof-use-rdb-preamble yes  # AOF 파일에 RDB 형식으로 스냅샷 저장

백업 및 복구 전략

#!/bin/bash
# Redis 백업 스크립트

REDIS_CLI="redis-cli"
BACKUP_DIR="/backup/redis"
DATE=$(date +%Y%m%d_%H%M%S)

# 백업 디렉토리 생성
mkdir -p $BACKUP_DIR

# RDB 백업
$REDIS_CLI BGSAVE
sleep 5  # 백그라운드 저장 완료 대기

# 백업 파일 복사
cp /var/lib/redis/dump.rdb $BACKUP_DIR/dump_$DATE.rdb
cp /var/lib/redis/appendonly.aof $BACKUP_DIR/appendonly_$DATE.aof

# 압축
tar -czf $BACKUP_DIR/redis_backup_$DATE.tar.gz \
    $BACKUP_DIR/dump_$DATE.rdb \
    $BACKUP_DIR/appendonly_$DATE.aof

# 오래된 백업 삭제 (7일 이상)
find $BACKUP_DIR -name "redis_backup_*.tar.gz" -mtime +7 -delete

echo "Redis backup completed: redis_backup_$DATE.tar.gz"

보안과 인증

기본 인증 설정

# redis.conf
requirepass your_strong_password_here

# 런타임 설정
CONFIG SET requirepass your_password

# 클라이언트 인증
AUTH your_password

# 연결 시 인증
redis-cli -a your_password

ACL (Access Control Lists) - Redis 6.0+

# 사용자 생성
ACL SETUSER alice on >password123 ~cached:* +get +set +del

# 사용자 권한 설명:
# on: 사용자 활성화
# >password123: 비밀번호 설정
# ~cached:*: cached:로 시작하는 키만 접근 가능
# +get +set +del: get, set, del 명령어만 허용

# 더 복잡한 권한 설정
ACL SETUSER bob on >bobpass ~app:* ~temp:* +@read +@write -flushall -flushdb

# 권한 설명:
# ~app:* ~temp:*: app:과 temp: 패턴 키 접근
# +@read: 모든 읽기 명령어 허용
# +@write: 모든 쓰기 명령어 허용  
# -flushall -flushdb: 위험한 명령어 금지

# 사용자 관리
ACL LIST                    # 모든 사용자 조회
ACL GETUSER alice          # 특정 사용자 정보
ACL DELUSER alice          # 사용자 삭제
ACL WHOAMI                 # 현재 사용자 확인

네트워크 보안

# redis.conf
# 바인딩 주소 제한
bind 127.0.0.1 10.0.0.100  # 특정 IP만 허용
# bind 0.0.0.0             # 모든 IP 허용 (위험)

# 보호 모드
protected-mode yes         # 기본 인증 없이 외부 접속 차단

# 포트 변경
port 6380                 # 기본 포트 6379 대신 다른 포트 사용

# 위험한 명령어 비활성화
rename-command FLUSHDB ""
rename-command FLUSHALL ""
rename-command KEYS ""
rename-command CONFIG configcmd  # 이름 변경

TLS/SSL 암호화

# redis.conf (Redis 6.0+)
port 0                    # 일반 포트 비활성화
tls-port 6380            # TLS 포트 활성화

tls-cert-file /path/to/redis.crt
tls-key-file /path/to/redis.key
tls-ca-cert-file /path/to/ca.crt

tls-protocols "TLSv1.2 TLSv1.3"
tls-ciphers "HIGH:!aNULL:!MD5"

방화벽 설정

# iptables 규칙
# Redis 포트를 특정 IP에서만 접근 허용
iptables -A INPUT -p tcp --dport 6379 -s 10.0.0.0/24 -j ACCEPT
iptables -A INPUT -p tcp --dport 6379 -j DROP

# ufw 사용 (Ubuntu)
ufw allow from 10.0.0.0/24 to any port 6379
ufw deny 6379

성능 최적화

메모리 최적화

메모리 사용량 분석

# 메모리 정보 확인
INFO memory

# 키별 메모리 사용량 (Redis 4.0+)
MEMORY USAGE key_name

# 샘플링을 통한 메모리 분석
redis-cli --bigkeys

# 더 자세한 분석
redis-cli --memkeys --memkeys-samples 1000

메모리 효율적인 데이터 구조

# 해시 vs 문자열 비교
# 사용자 데이터 저장 - 비효율적
redis_client.set("user:1000:name", "John")
redis_client.set("user:1000:email", "john@example.com") 
redis_client.set("user:1000:age", "30")

# 사용자 데이터 저장 - 효율적
redis_client.hset("user:1000", mapping={
    "name": "John",
    "email": "john@example.com", 
    "age": "30"
})

# 작은 해시 최적화 설정
# redis.conf
hash-max-ziplist-entries 512
hash-max-ziplist-value 64

압축 설정

# redis.conf
# 문자열 압축 (큰 값에 대해)
rdbcompression yes

# 해시/리스트/셋 최적화
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
list-max-ziplist-size -2
set-max-intset-entries 512
zset-max-ziplist-entries 128
zset-max-ziplist-value 64

성능 모니터링

Redis 성능 메트릭

# 실시간 통계
INFO stats

# 지연시간 모니터링
redis-cli --latency
redis-cli --latency-history
redis-cli --latency-dist

# 느린 쿼리 로깅
CONFIG SET slowlog-log-slower-than 10000  # 10ms 이상
CONFIG SET slowlog-max-len 128
SLOWLOG GET 10  # 최근 10개 느린 쿼리

성능 테스트

# Redis 벤치마크
redis-benchmark -h localhost -p 6379 -n 100000 -c 50

# 특정 명령어 테스트
redis-benchmark -h localhost -p 6379 -t set,get -n 100000 -q

# 파이프라인 테스트
redis-benchmark -h localhost -p 6379 -n 100000 -P 16

파이프라인과 배치 처리

import redis
import time

redis_client = redis.Redis(host='localhost', port=6379)

# 비효율적 - 각 명령어마다 네트워크 라운드트립
start_time = time.time()
for i in range(1000):
    redis_client.set(f"key:{i}", f"value:{i}")
print(f"Without pipeline: {time.time() - start_time:.2f}s")

# 효율적 - 파이프라인 사용
start_time = time.time()
pipeline = redis_client.pipeline()
for i in range(1000):
    pipeline.set(f"key:{i}", f"value:{i}")
pipeline.execute()
print(f"With pipeline: {time.time() - start_time:.2f}s")

# 배치 처리 예시
def batch_process_users(user_updates, batch_size=100):
    pipeline = redis_client.pipeline()
    count = 0

    for user_id, data in user_updates.items():
        pipeline.hset(f"user:{user_id}", mapping=data)
        count += 1

        if count >= batch_size:
            pipeline.execute()
            pipeline = redis_client.pipeline()
            count = 0

    # 남은 명령어 실행
    if count > 0:
        pipeline.execute()

연결 풀 최적화

import redis
from redis.connection import ConnectionPool

# 연결 풀 설정
pool = ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=20,        # 최대 연결 수
    retry_on_timeout=True,     # 타임아웃 시 재시도
    socket_timeout=5,          # 소켓 타임아웃
    socket_connect_timeout=5,  # 연결 타임아웃
    health_check_interval=30   # 연결 상태 체크 간격
)

redis_client = redis.Redis(connection_pool=pool)

# 연결 풀 모니터링
def monitor_connection_pool():
    pool_info = {
        'created_connections': pool.created_connections,
        'available_connections': len(pool._available_connections),
        'in_use_connections': len(pool._in_use_connections)
    }
    return pool_info

Redis 클러스터

클러스터 아키텍처

Redis 클러스터는 데이터를 여러 노드에 자동으로 분산하여 고가용성과 확장성을 제공합니다.

클러스터 구성 (최소 3 마스터 + 3 슬레이브):
┌─────────────┐  ┌─────────────┐  ┌─────────────┐
│ Master A    │  │ Master B    │  │ Master C    │
│ Slots:      │  │ Slots:      │  │ Slots:      │
│ 0-5460      │  │ 5461-10922  │  │ 10923-16383 │
└─────────────┘  └─────────────┘  └─────────────┘
       │                │                │
┌─────────────┐  ┌─────────────┐  ┌─────────────┐
│ Slave A     │  │ Slave B     │  │ Slave C     │
└─────────────┘  └─────────────┘  └─────────────┘

클러스터 설정

노드별 설정 파일

# redis-7000.conf (첫 번째 노드)
port 7000
cluster-enabled yes
cluster-config-file nodes-7000.conf
cluster-node-timeout 15000
appendonly yes
appendfilename appendonly-7000.aof

# 다른 노드들도 포트번호만 변경하여 설정
# 7001, 7002, 7003, 7004, 7005

클러스터 초기화

# 각 노드 시작
redis-server redis-7000.conf
redis-server redis-7001.conf
redis-server redis-7002.conf
redis-server redis-7003.conf
redis-server redis-7004.conf
redis-server redis-7005.conf

# 클러스터 생성 (Redis 5.0+)
redis-cli --cluster create \
  127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
  127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 \
  --cluster-replicas 1

# 클러스터 상태 확인
redis-cli -c -p 7000 cluster nodes
redis-cli -c -p 7000 cluster info

클러스터 슬롯 관리

# 슬롯 정보 확인
redis-cli -c -p 7000 cluster slots

# 특정 키가 어느 슬롯에 속하는지 확인
redis-cli -c -p 7000 cluster keyslot mykey

# 슬롯 재분배 (노드 추가 시)
redis-cli --cluster reshard 127.0.0.1:7000

노드 추가/제거

# 새 마스터 노드 추가
redis-cli --cluster add-node 127.0.0.1:7006 127.0.0.1:7000

# 새 슬레이브 노드 추가
redis-cli --cluster add-node 127.0.0.1:7007 127.0.0.1:7000 --cluster-slave

# 노드 제거 (빈 노드만 가능)
redis-cli --cluster del-node 127.0.0.1:7000 <node-id>

# 슬롯이 있는 노드 제거 (슬롯 재분배 후)
redis-cli --cluster reshard 127.0.0.1:7000 --cluster-from <node-id> --cluster-to <target-node-id> --cluster-slots <slot-count>

클러스터 클라이언트 사용

import redis.sentinel
from rediscluster import RedisCluster

# Redis 클러스터 클라이언트
startup_nodes = [
    {"host": "127.0.0.1", "port": "7000"},
    {"host": "127.0.0.1", "port": "7001"},
    {"host": "127.0.0.1", "port": "7002"}
]

redis_cluster = RedisCluster(
    startup_nodes=startup_nodes,
    decode_responses=True,
    skip_full_coverage_check=True,
    health_check_interval=30
)

# 클러스터에서 데이터 사용
redis_cluster.set("user:1000", "john")
redis_cluster.set("user:1001", "jane")

# 해시 태그 사용 (같은 슬롯에 저장)
redis_cluster.set("{user:1000}:profile", "profile_data")
redis_cluster.set("{user:1000}:settings", "settings_data")

# 클러스터 정보 확인
cluster_info = redis_cluster.cluster_info()
cluster_nodes = redis_cluster.cluster_nodes()

고가용성과 센티널

Redis Sentinel

Redis Sentinel은 마스터-슬레이브 구성에서 자동 장애조치를 제공합니다.

Sentinel 구성:
         ┌─────────────┐
         │  Sentinel 1 │
         └─────────────┘
              │
    ┌─────────────┐    ┌─────────────┐
    │   Master    │────│   Slave 1   │
    │   (Write)   │    │   (Read)    │
    └─────────────┘    └─────────────┘
              │               │
         ┌─────────────┐    ┌─────────────┐
         │  Sentinel 2 │    │   Slave 2   │
         └─────────────┘    └─────────────┘
              │               │
         ┌─────────────┐    ┌─────────────┐
         │  Sentinel 3 │    │  Sentinel 3 │
         └─────────────┘    └─────────────┘

Sentinel 설정

Sentinel 설정 파일

# sentinel-26379.conf
port 26379
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel auth-pass mymaster your_password
sentinel down-after-milliseconds mymaster 30000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 180000

# 추가 설정
sentinel announce-ip 127.0.0.1
sentinel announce-port 26379

# 알림 설정
sentinel notification-script mymaster /path/to/notify.sh
sentinel client-reconfig-script mymaster /path/to/reconfig.sh

Sentinel 클러스터 시작

# 3개의 Sentinel 인스턴스 시작
redis-sentinel sentinel-26379.conf
redis-sentinel sentinel-26380.conf  
redis-sentinel sentinel-26381.conf

# Sentinel 상태 확인
redis-cli -p 26379 sentinel masters
redis-cli -p 26379 sentinel slaves mymaster
redis-cli -p 26379 sentinel sentinels mymaster

Sentinel 클라이언트 사용

import redis.sentinel

# Sentinel 연결 설정
sentinels = [
    ('127.0.0.1', 26379),
    ('127.0.0.1', 26380), 
    ('127.0.0.1', 26381)
]

sentinel = redis.sentinel.Sentinel(
    sentinels,
    socket_timeout=0.1,
    password='your_password'
)

# 마스터/슬레이브 연결 획득
master = sentinel.master_for(
    'mymaster', 
    socket_timeout=0.1,
    password='your_redis_password'
)

slave = sentinel.slave_for(
    'mymaster',
    socket_timeout=0.1, 
    password='your_redis_password'
)

# 읽기/쓰기 분리
def write_data(key, value):
    master.set(key, value)

def read_data(key):
    try:
        return slave.get(key)
    except:
        # 슬레이브 실패 시 마스터에서 읽기
        return master.get(key)

자동 장애조치 모니터링

import redis.sentinel
import time
import logging

class SentinelMonitor:
    def __init__(self, sentinels, service_name):
        self.sentinel = redis.sentinel.Sentinel(sentinels)
        self.service_name = service_name
        self.current_master = None

    def monitor_failover(self):
        """장애조치 모니터링"""
        while True:
            try:
                master_info = self.sentinel.discover_master(self.service_name)

                if self.current_master != master_info:
                    if self.current_master:
                        logging.warning(f"Master changed from {self.current_master} to {master_info}")
                        self.handle_failover(master_info)

                    self.current_master = master_info

                time.sleep(5)

            except Exception as e:
                logging.error(f"Sentinel monitoring error: {e}")
                time.sleep(10)

    def handle_failover(self, new_master):
        """장애조치 처리"""
        logging.info(f"Handling failover to new master: {new_master}")
        # 애플리케이션별 장애조치 로직 구현
        self.reconnect_to_new_master(new_master)

    def reconnect_to_new_master(self, master_info):
        """새 마스터로 재연결"""
        # 연결 풀 재생성 등의 작업 수행
        pass

모니터링과 디버깅

Redis 모니터링 도구

기본 모니터링 명령어

# 실시간 명령어 모니터링
redis-cli monitor

# 클라이언트 연결 정보
redis-cli client list
redis-cli client info

# 서버 정보
redis-cli info all
redis-cli info server
redis-cli info memory  
redis-cli info stats
redis-cli info replication

# 느린 쿼리 로그
redis-cli slowlog get 10
redis-cli slowlog len
redis-cli slowlog reset

메모리 분석

# 메모리 사용량이 큰 키 찾기
redis-cli --bigkeys

# 샘플 기반 메모리 분석
redis-cli --memkeys --memkeys-samples 1000

# 특정 키의 메모리 사용량
redis-cli memory usage user:1000

# 메모리 파편화 정보
redis-cli info memory | grep fragmentation

Prometheus와 Grafana 모니터링

# docker-compose.yml
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    command: redis-server --appendonly yes

  redis-exporter:
    image: oliver006/redis_exporter:latest
    ports:
      - "9121:9121"
    environment:
      REDIS_ADDR: "redis:6379"
    depends_on:
      - redis

  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    depends_on:
      - redis-exporter

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    depends_on:
      - prometheus
# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'redis'
    static_configs:
      - targets: ['redis-exporter:9121']

애플리케이션 레벨 모니터링

import redis
import time
import logging
from functools import wraps

class RedisMonitor:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.stats = {
            'operations': 0,
            'errors': 0,
            'total_time': 0,
            'slow_queries': 0
        }

    def monitor_operation(self, operation_name):
        """Redis 연산 모니터링 데코레이터"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                start_time = time.time()
                try:
                    result = func(*args, **kwargs)
                    self.stats['operations'] += 1

                    duration = time.time() - start_time
                    self.stats['total_time'] += duration

                    # 느린 쿼리 감지 (100ms 이상)
                    if duration > 0.1:
                        self.stats['slow_queries'] += 1
                        logging.warning(f"Slow Redis operation: {operation_name} took {duration:.3f}s")

                    return result
                except Exception as e:
                    self.stats['errors'] += 1
                    logging.error(f"Redis operation failed: {operation_name} - {e}")
                    raise
            return wrapper
        return decorator

    def get_stats(self):
        """모니터링 통계 반환"""
        if self.stats['operations'] > 0:
            avg_time = self.stats['total_time'] / self.stats['operations']
            error_rate = self.stats['errors'] / self.stats['operations']
        else:
            avg_time = 0
            error_rate = 0

        return {
            **self.stats,
            'avg_time': avg_time,
            'error_rate': error_rate
        }

# 사용 예시
monitor = RedisMonitor(redis_client)

@monitor.monitor_operation('get_user')
def get_user(user_id):
    return redis_client.hgetall(f"user:{user_id}")

@monitor.monitor_operation('set_cache')
def set_cache(key, value, ttl=3600):
    return redis_client.setex(key, ttl, value)

알람 시스템

import smtplib
import requests
from email.mime.text import MIMEText

class RedisAlerting:
    def __init__(self, redis_client, thresholds=None):
        self.redis = redis_client
        self.thresholds = thresholds or {
            'memory_usage_percent': 80,
            'connected_clients': 1000,
            'keyspace_misses_ratio': 0.1,
            'replication_lag': 10
        }

    def check_health(self):
        """Redis 건강 상태 체크"""
        info = self.redis.info()
        alerts = []

        # 메모리 사용률 체크
        memory_usage_percent = (info['used_memory'] / info['maxmemory']) * 100
        if memory_usage_percent > self.thresholds['memory_usage_percent']:
            alerts.append(f"High memory usage: {memory_usage_percent:.1f}%")

        # 연결된 클라이언트 수 체크
        if info['connected_clients'] > self.thresholds['connected_clients']:
            alerts.append(f"High client connections: {info['connected_clients']}")

        # 키스페이스 미스 비율 체크
        hits = info.get('keyspace_hits', 0)
        misses = info.get('keyspace_misses', 0)
        if hits + misses > 0:
            miss_ratio = misses / (hits + misses)
            if miss_ratio > self.thresholds['keyspace_misses_ratio']:
                alerts.append(f"High keyspace miss ratio: {miss_ratio:.2f}")

        return alerts

    def send_slack_alert(self, message, webhook_url):
        """Slack 알림 전송"""
        payload = {
            'text': f"🚨 Redis Alert: {message}",
            'username': 'Redis Monitor',
            'icon_emoji': ':warning:'
        }

        response = requests.post(webhook_url, json=payload)
        return response.status_code == 200

    def send_email_alert(self, message, smtp_config):
        """이메일 알림 전송"""
        msg = MIMEText(f"Redis Alert: {message}")
        msg['Subject'] = 'Redis Alert'
        msg['From'] = smtp_config['from']
        msg['To'] = smtp_config['to']

        with smtplib.SMTP(smtp_config['server'], smtp_config['port']) as server:
            server.starttls()
            server.login(smtp_config['username'], smtp_config['password'])
            server.send_message(msg)

Redis 모듈과 확장

RedisJSON

JSON 데이터를 네이티브하게 저장하고 조작할 수 있는 모듈입니다.

# RedisJSON 설치 (Docker)
docker run -d --name redis-json -p 6379:6379 redislabs/redisjson:latest

# JSON 데이터 저장
JSON.SET user:1000 $ '{"name":"John","age":30,"email":"john@example.com"}'

# JSON 경로 조회
JSON.GET user:1000 $.name        # "John"
JSON.GET user:1000 $.age         # 30

# JSON 데이터 수정
JSON.SET user:1000 $.age 31
JSON.SET user:1000 $.city "New York"

# 배열 조작
JSON.SET products $ '{"items":["apple","banana"]}'
JSON.ARRAPPEND products $.items '"orange"'
JSON.ARRLEN products $.items     # 3

RedisSearch

전문 검색 기능을 제공하는 모듈입니다.

# 인덱스 생성
FT.CREATE products-idx ON HASH PREFIX 1 product: SCHEMA 
    name TEXT SORTABLE 
    price NUMERIC SORTABLE 
    category TAG 
    description TEXT

# 데이터 입력
HSET product:1 name "iPhone 14" price 999 category "smartphone" description "Latest iPhone model"
HSET product:2 name "Samsung Galaxy" price 899 category "smartphone" description "Android flagship phone"

# 검색 쿼리
FT.SEARCH products-idx "smartphone"                    # 전체 텍스트 검색
FT.SEARCH products-idx "@category:{smartphone}"        # 태그 검색
FT.SEARCH products-idx "@price:[800 1000]"            # 숫자 범위 검색
FT.SEARCH products-idx "iPhone @price:[500 1500]"     # 복합 조건

RedisTimeSeries

시계열 데이터를 효율적으로 저장하고 분석할 수 있는 모듈입니다.

# 시계열 생성
TS.CREATE temperature:sensor1 LABELS sensor_id 1 location "room1"

# 데이터 추가
TS.ADD temperature:sensor1 1640995200 23.5    # 타임스탬프와 값
TS.ADD temperature:sensor1 1640995260 24.1
TS.ADD temperature:sensor1 1640995320 23.8

# 범위 쿼리
TS.RANGE temperature:sensor1 1640995200 1640995320

# 집계 쿼리
TS.RANGE temperature:sensor1 - + AGGREGATION avg 3600000  # 1시간 평균

# 다중 시계열 쿼리
TS.MRANGE - + FILTER location=room1 AGGREGATION max 3600000

RedisGraph

그래프 데이터베이스 기능을 제공하는 모듈입니다.

# 그래프 생성
GRAPH.QUERY social "CREATE (alice:Person {name:'Alice', age:30})"
GRAPH.QUERY social "CREATE (bob:Person {name:'Bob', age:25})"
GRAPH.QUERY social "MATCH (a:Person {name:'Alice'}), (b:Person {name:'Bob'}) CREATE (a)-[:FRIENDS]->(b)"

# 그래프 쿼리
GRAPH.QUERY social "MATCH (p:Person) RETURN p.name, p.age"
GRAPH.QUERY social "MATCH (a:Person)-[:FRIENDS]->(b:Person) RETURN a.name, b.name"

# 복잡한 쿼리
GRAPH.QUERY social "MATCH (p:Person)-[:FRIENDS*2]-(friend_of_friend:Person) WHERE p.name = 'Alice' RETURN friend_of_friend.name"

실전 사용 사례

1. 분산 락 구현

import redis
import time
import uuid

class DistributedLock:
    def __init__(self, redis_client, lock_name, timeout=10):
        self.redis = redis_client
        self.lock_name = f"lock:{lock_name}"
        self.timeout = timeout
        self.identifier = str(uuid.uuid4())

    def acquire(self):
        """락 획득"""
        end_time = time.time() + self.timeout

        while time.time() < end_time:
            # 락 설정 시도 (NX: 키가 없을 때만, EX: 만료 시간)
            if self.redis.set(self.lock_name, self.identifier, nx=True, ex=self.timeout):
                return True

            time.sleep(0.001)  # 1ms 대기

        return False

    def release(self):
        """락 해제"""
        # Lua 스크립트로 원자적 해제
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """

        return self.redis.eval(lua_script, 1, self.lock_name, self.identifier)

    def __enter__(self):
        if not self.acquire():
            raise Exception(f"Could not acquire lock {self.lock_name}")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

# 사용 예시
def transfer_money(from_account, to_account, amount):
    """계좌 간 송금 (분산 락 사용)"""
    lock_key = f"transfer:{min(from_account, to_account)}:{max(from_account, to_account)}"

    with DistributedLock(redis_client, lock_key):
        # 원자적 송금 작업
        from_balance = get_balance(from_account)
        if from_balance >= amount:
            set_balance(from_account, from_balance - amount)
            to_balance = get_balance(to_account)
            set_balance(to_account, to_balance + amount)
            return True
        return False

2. 실시간 순위표 시스템

class Leaderboard:
    def __init__(self, redis_client, board_name):
        self.redis = redis_client
        self.board_name = f"leaderboard:{board_name}"

    def add_score(self, user_id, score):
        """점수 추가"""
        return self.redis.zadd(self.board_name, {user_id: score})

    def increment_score(self, user_id, increment):
        """점수 증가"""
        return self.redis.zincrby(self.board_name, increment, user_id)

    def get_top_players(self, limit=10):
        """상위 플레이어 조회"""
        return self.redis.zrevrange(
            self.board_name, 0, limit-1, withscores=True
        )

    def get_player_rank(self, user_id):
        """플레이어 순위 조회 (1부터 시작)"""
        rank = self.redis.zrevrank(self.board_name, user_id)
        return rank + 1 if rank is not None else None

    def get_player_score(self, user_id):
        """플레이어 점수 조회"""
        return self.redis.zscore(self.board_name, user_id)

    def get_players_around(self, user_id, range_size=5):
        """특정 플레이어 주변 순위 조회"""
        rank = self.redis.zrevrank(self.board_name, user_id)
        if rank is None:
            return []

        start = max(0, rank - range_size)
        end = rank + range_size

        return self.redis.zrevrange(
            self.board_name, start, end, withscores=True
        )

# 사용 예시
leaderboard = Leaderboard(redis_client, "game:2023")

# 점수 업데이트
leaderboard.add_score("player1", 1500)
leaderboard.increment_score("player1", 100)  # 1600점

# 순위표 조회
top_10 = leaderboard.get_top_players(10)
player_rank = leaderboard.get_player_rank("player1")

3. 세션 저장소

import json
import hashlib
from datetime import datetime, timedelta

class SessionStore:
    def __init__(self, redis_client, session_timeout=3600):
        self.redis = redis_client
        self.timeout = session_timeout

    def create_session(self, user_id, user_data):
        """세션 생성"""
        session_id = self._generate_session_id(user_id)
        session_data = {
            'user_id': user_id,
            'created_at': datetime.now().isoformat(),
            'last_access': datetime.now().isoformat(),
            **user_data
        }

        session_key = f"session:{session_id}"
        self.redis.setex(
            session_key, 
            self.timeout, 
            json.dumps(session_data)
        )

        return session_id

    def get_session(self, session_id):
        """세션 조회"""
        session_key = f"session:{session_id}"
        session_data = self.redis.get(session_key)

        if session_data:
            data = json.loads(session_data)
            # 마지막 접근 시간 업데이트
            data['last_access'] = datetime.now().isoformat()
            self.redis.setex(session_key, self.timeout, json.dumps(data))
            return data

        return None

    def update_session(self, session_id, update_data):
        """세션 업데이트"""
        session_data = self.get_session(session_id)
        if session_data:
            session_data.update(update_data)
            session_key = f"session:{session_id}"
            self.redis.setex(
                session_key, 
                self.timeout, 
                json.dumps(session_data)
            )
            return True
        return False

    def destroy_session(self, session_id):
        """세션 삭제"""
        session_key = f"session:{session_id}"
        return self.redis.delete(session_key)

    def _generate_session_id(self, user_id):
        """세션 ID 생성"""
        timestamp = str(int(time.time() * 1000))
        raw_data = f"{user_id}:{timestamp}:{uuid.uuid4()}"
        return hashlib.sha256(raw_data.encode()).hexdigest()

# Flask 세션 미들웨어 예시
from flask import Flask, request, session

app = Flask(__name__)
session_store = SessionStore(redis_client)

@app.before_request
def load_session():
    session_id = request.cookies.get('session_id')
    if session_id:
        session_data = session_store.get_session(session_id)
        if session_data:
            session.update(session_data)

@app.after_request
def save_session(response):
    if session:
        session_id = request.cookies.get('session_id')
        if session_id:
            session_store.update_session(session_id, dict(session))
        else:
            # 새 세션 생성
            session_id = session_store.create_session(
                session.get('user_id'), 
                dict(session)
            )
            response.set_cookie('session_id', session_id, httponly=True)

    return response

4. 실시간 채팅 시스템

import asyncio
import json
from datetime import datetime

class ChatSystem:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.pubsub = redis_client.pubsub()

    async def send_message(self, room_id, user_id, message):
        """메시지 전송"""
        message_data = {
            'id': str(uuid.uuid4()),
            'room_id': room_id,
            'user_id': user_id,
            'message': message,
            'timestamp': datetime.now().isoformat()
        }

        # 채팅방에 메시지 발행
        channel = f"chat:{room_id}"
        await self.redis.publish(channel, json.dumps(message_data))

        # 메시지 히스토리 저장 (최근 100개)
        history_key = f"chat_history:{room_id}"
        await self.redis.lpush(history_key, json.dumps(message_data))
        await self.redis.ltrim(history_key, 0, 99)

        return message_data['id']

    async def join_room(self, room_id, user_id):
        """채팅방 입장"""
        # 사용자 목록에 추가
        users_key = f"chat_users:{room_id}"
        await self.redis.sadd(users_key, user_id)

        # 채널 구독
        channel = f"chat:{room_id}"
        await self.pubsub.subscribe(channel)

        # 입장 알림
        await self.send_system_message(room_id, f"{user_id} joined the room")

    async def leave_room(self, room_id, user_id):
        """채팅방 퇴장"""
        # 사용자 목록에서 제거
        users_key = f"chat_users:{room_id}"
        await self.redis.srem(users_key, user_id)

        # 채널 구독 해제
        channel = f"chat:{room_id}"
        await self.pubsub.unsubscribe(channel)

        # 퇴장 알림
        await self.send_system_message(room_id, f"{user_id} left the room")

    async def get_room_users(self, room_id):
        """채팅방 사용자 목록 조회"""
        users_key = f"chat_users:{room_id}"
        return await self.redis.smembers(users_key)

    async def get_message_history(self, room_id, limit=50):
        """메시지 히스토리 조회"""
        history_key = f"chat_history:{room_id}"
        messages = await self.redis.lrange(history_key, 0, limit-1)
        return [json.loads(msg) for msg in messages]

    async def send_system_message(self, room_id, message):
        """시스템 메시지 전송"""
        await self.send_message(room_id, "system", message)

    async def listen_for_messages(self, callback):
        """메시지 수신 대기"""
        async for message in self.pubsub.listen():
            if message['type'] == 'message':
                data = json.loads(message['data'])
                await callback(data)

# WebSocket 서버와 연동 예시
import websockets

class ChatWebSocketServer:
    def __init__(self, chat_system):
        self.chat_system = chat_system
        self.connections = {}

    async def handle_client(self, websocket, path):
        """클라이언트 연결 처리"""
        try:
            # 초기 인증 및 방 입장
            auth_data = await websocket.recv()
            auth_info = json.loads(auth_data)

            user_id = auth_info['user_id']
            room_id = auth_info['room_id']

            # 연결 저장
            self.connections[websocket] = {'user_id': user_id, 'room_id': room_id}

            # 채팅방 입장
            await self.chat_system.join_room(room_id, user_id)

            # 메시지 수신 및 처리
            async for message in websocket:
                msg_data = json.loads(message)
                await self.chat_system.send_message(
                    room_id, user_id, msg_data['message']
                )

        except websockets.exceptions.ConnectionClosed:
            pass
        finally:
            # 연결 종료 시 채팅방 퇴장
            if websocket in self.connections:
                conn_info = self.connections[websocket]
                await self.chat_system.leave_room(
                    conn_info['room_id'], 
                    conn_info['user_id']
                )
                del self.connections[websocket]

5. API 속도 제한 (Rate Limiting)

import time
from enum import Enum

class RateLimitStrategy(Enum):
    FIXED_WINDOW = "fixed_window"
    SLIDING_WINDOW = "sliding_window"
    TOKEN_BUCKET = "token_bucket"

class RateLimiter:
    def __init__(self, redis_client):
        self.redis = redis_client

    def check_rate_limit_fixed_window(self, key, limit, window_seconds):
        """고정 윈도우 방식"""
        current_window = int(time.time()) // window_seconds
        window_key = f"rate_limit:{key}:{current_window}"

        pipeline = self.redis.pipeline()
        pipeline.incr(window_key)
        pipeline.expire(window_key, window_seconds)
        results = pipeline.execute()

        current_requests = results[0]
        return current_requests <= limit, current_requests

    def check_rate_limit_sliding_window(self, key, limit, window_seconds):
        """슬라이딩 윈도우 방식"""
        now = time.time()
        window_start = now - window_seconds

        # Lua 스크립트로 원자적 처리
        lua_script = """
        local key = KEYS[1]
        local window_start = tonumber(ARGV[1])
        local now = tonumber(ARGV[2])
        local limit = tonumber(ARGV[3])

        -- 윈도우 범위 밖의 요청 제거
        redis.call('ZREMRANGEBYSCORE', key, '-inf', window_start)

        -- 현재 요청 수 확인
        local current_requests = redis.call('ZCARD', key)

        if current_requests < limit then
            -- 현재 요청 추가
            redis.call('ZADD', key, now, now)
            redis.call('EXPIRE', key, 3600)  -- 1시간 후 만료
            return {1, current_requests + 1}
        else
            return {0, current_requests}
        end
        """

        result = self.redis.eval(lua_script, 1, f"rate_limit:{key}", window_start, now, limit)
        allowed = bool(result[0])
        current_requests = result[1]

        return allowed, current_requests

    def check_rate_limit_token_bucket(self, key, capacity, refill_rate, refill_period):
        """토큰 버킷 방식"""
        now = time.time()
        bucket_key = f"token_bucket:{key}"

        lua_script = """
        local key = KEYS[1]
        local capacity = tonumber(ARGV[1])
        local refill_rate = tonumber(ARGV[2])
        local refill_period = tonumber(ARGV[3])
        local now = tonumber(ARGV[4])

        local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
        local tokens = tonumber(bucket[1]) or capacity
        local last_refill = tonumber(bucket[2]) or now

        -- 토큰 보충
        local time_passed = now - last_refill
        local new_tokens = math.min(capacity, tokens + (time_passed / refill_period) * refill_rate)

        if new_tokens >= 1 then
            -- 토큰 소비
            new_tokens = new_tokens - 1
            redis.call('HMSET', key, 'tokens', new_tokens, 'last_refill', now)
            redis.call('EXPIRE', key, refill_period * 2)
            return {1, new_tokens}
        else
            redis.call('HMSET', key, 'tokens', new_tokens, 'last_refill', now)
            redis.call('EXPIRE', key, refill_period * 2)
            return {0, new_tokens}
        end
        """

        result = self.redis.eval(
            lua_script, 1, bucket_key, 
            capacity, refill_rate, refill_period, now
        )

        allowed = bool(result[0])
        remaining_tokens = result[1]

        return allowed, remaining_tokens

# Flask 미들웨어 예시
from flask import Flask, request, jsonify
from functools import wraps

app = Flask(__name__)
rate_limiter = RateLimiter(redis_client)

def rate_limit(limit=100, window=3600, strategy=RateLimitStrategy.SLIDING_WINDOW):
    """속도 제한 데코레이터"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            # 클라이언트 식별 (IP + User ID)
            client_id = request.remote_addr
            if hasattr(request, 'user_id'):
                client_id = f"{request.user_id}:{client_id}"

            # 속도 제한 확인
            if strategy == RateLimitStrategy.SLIDING_WINDOW:
                allowed, current = rate_limiter.check_rate_limit_sliding_window(
                    client_id, limit, window
                )
            elif strategy == RateLimitStrategy.FIXED_WINDOW:
                allowed, current = rate_limiter.check_rate_limit_fixed_window(
                    client_id, limit, window
                )

            if not allowed:
                return jsonify({
                    'error': 'Rate limit exceeded',
                    'limit': limit,
                    'window': window,
                    'current': current
                }), 429

            return f(*args, **kwargs)
        return decorated_function
    return decorator

@app.route('/api/data')
@rate_limit(limit=1000, window=3600)  # 시간당 1000회 제한
def get_data():
    return jsonify({'data': 'some data'})

프로그래밍 언어별 연동

Python (redis-py)

import redis
import redis.sentinel
from redis.connection import ConnectionPool

# 기본 연결
redis_client = redis.Redis(
    host='localhost', 
    port=6379, 
    db=0,
    decode_responses=True,
    password='your_password'
)

# 연결 풀 사용
pool = ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=20
)
redis_client = redis.Redis(connection_pool=pool)

# 비동기 Redis (aioredis)
import aioredis
import asyncio

async def async_redis_example():
    redis = aioredis.from_url("redis://localhost:6379")

    await redis.set("key", "value")
    value = await redis.get("key")

    await redis.close()

# Redis 센티널
sentinels = [('localhost', 26379)]
sentinel = redis.sentinel.Sentinel(sentinels)
master = sentinel.master_for('mymaster', socket_timeout=0.1)
slave = sentinel.slave_for('mymaster', socket_timeout=0.1)

Node.js (ioredis)

const Redis = require('ioredis');

// 기본 연결
const redis = new Redis({
  host: 'localhost',
  port: 6379,
  password: 'your_password',
  db: 0,
  retryDelayOnFailover: 100,
  maxRetriesPerRequest: 3,
});

// 클러스터 연결
const cluster = new Redis.Cluster([
  { host: '127.0.0.1', port: 7000 },
  { host: '127.0.0.1', port: 7001 },
  { host: '127.0.0.1', port: 7002 }
]);

// 센티널 연결
const redis = new Redis({
  sentinels: [
    { host: 'localhost', port: 26379 },
    { host: 'localhost', port: 26380 }
  ],
  name: 'mymaster'
});

// 사용 예시
async function redisExample() {
  // 문자열 연산
  await redis.set('key', 'value');
  const value = await redis.get('key');

  // 해시 연산
  await redis.hset('user:1000', 'name', 'John', 'age', 30);
  const user = await redis.hgetall('user:1000');

  // 파이프라인
  const pipeline = redis.pipeline();
  pipeline.set('key1', 'value1');
  pipeline.set('key2', 'value2');
  const results = await pipeline.exec();

  // Pub/Sub
  const subscriber = new Redis();
  subscriber.subscribe('news');
  subscriber.on('message', (channel, message) => {
    console.log(`Received: ${message} from ${channel}`);
  });

  await redis.publish('news', 'Hello World');
}

Java (Jedis/Lettuce)

// Jedis 사용 예시
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisExample {
    private JedisPool jedisPool;

    public RedisExample() {
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(20);
        config.setMaxIdle(10);
        config.setMinIdle(5);

        jedisPool = new JedisPool(config, "localhost", 6379);
    }

    public void stringOperations() {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.set("key", "value");
            String value = jedis.get("key");

            jedis.setex("temp_key", 3600, "temp_value"); // TTL 설정

            jedis.incr("counter");
            jedis.incrBy("counter", 5);
        }
    }

    public void hashOperations() {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.hset("user:1000", "name", "John");
            jedis.hset("user:1000", "age", "30");

            Map<String, String> user = jedis.hgetAll("user:1000");

            jedis.hincrBy("user:1000", "age", 1);
        }
    }

    public void listOperations() {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.lpush("queue", "task1", "task2");
            String task = jedis.rpop("queue");

            List<String> tasks = jedis.lrange("queue", 0, -1);
        }
    }
}

// Lettuce 사용 예시 (비동기)
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;

public class LettuceExample {
    public void asyncExample() {
        RedisClient redisClient = RedisClient.create("redis://localhost:6379");
        StatefulRedisConnection<String, String> connection = redisClient.connect();
        RedisAsyncCommands<String, String> asyncCommands = connection.async();

        // 비동기 연산
        RedisFuture<String> future = asyncCommands.get("key");
        future.thenAccept(value -> {
            System.out.println("Value: " + value);
        });

        connection.close();
        redisClient.shutdown();
    }
}

Go (go-redis)

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/go-redis/redis/v8"
)

func main() {
    // Redis 클라이언트 생성
    rdb := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "",
        DB:       0,
        PoolSize: 10,
    })

    ctx := context.Background()

    // 문자열 연산
    err := rdb.Set(ctx, "key", "value", 0).Err()
    if err != nil {
        panic(err)
    }

    val, err := rdb.Get(ctx, "key").Result()
    if err != nil {
        panic(err)
    }
    fmt.Println("key:", val)

    // 해시 연산
    err = rdb.HSet(ctx, "user:1000", "name", "John", "age", 30).Err()
    if err != nil {
        panic(err)
    }

    user := rdb.HGetAll(ctx, "user:1000").Val()
    fmt.Println("User:", user)

    // 파이프라인
    pipe := rdb.Pipeline()
    pipe.Set(ctx, "key1", "value1", 0)
    pipe.Set(ctx, "key2", "value2", 0)
    _, err = pipe.Exec(ctx)
    if err != nil {
        panic(err)
    }

    // Pub/Sub
    pubsub := rdb.Subscribe(ctx, "news")
    defer pubsub.Close()

    go func() {
        for msg := range pubsub.Channel() {
            fmt.Printf("Received: %s from %s\n", msg.Payload, msg.Channel)
        }
    }()

    err = rdb.Publish(ctx, "news", "Hello World").Err()
    if err != nil {
        panic(err)
    }

    time.Sleep(time.Second)
}

// Redis 클러스터 연결
func clusterExample() {
    rdb := redis.NewClusterClient(&redis.ClusterOptions{
        Addrs: []string{
            "localhost:7000",
            "localhost:7001", 
            "localhost:7002",
        },
    })

    ctx := context.Background()

    err := rdb.Set(ctx, "key", "value", 0).Err()
    if err != nil {
        panic(err)
    }
}

운영 베스트 프랙티스

용량 계획

# 메모리 사용량 계산
# 예상 키 개수 × 평균 키 크기 + 오버헤드

# 예시: 사용자 프로필 캐시
# - 사용자 수: 1,000,000명
# - 평균 프로필 크기: 2KB
# - Redis 오버헤드: ~25%
# 총 메모리 = 1,000,000 × 2KB × 1.25 = 2.5GB

# redis.conf 설정
maxmemory 4gb                    # 여유분 포함
maxmemory-policy allkeys-lru     # LRU 정책
maxmemory-samples 5              # LRU 샘플 수

백업 전략

#!/bin/bash
# comprehensive-backup.sh

REDIS_HOST="localhost"
REDIS_PORT=6379
REDIS_AUTH="your_password"
BACKUP_DIR="/backup/redis"
DATE=$(date +%Y%m%d_%H%M%S)
RETENTION_DAYS=30

# 백업 디렉토리 생성
mkdir -p $BACKUP_DIR

# 1. RDB 스냅샷 백업
echo "Creating RDB snapshot..."
redis-cli -h $REDIS_HOST -p $REDIS_PORT -a $REDIS_AUTH BGSAVE

# 백그라운드 저장 완료 대기
while [ $(redis-cli -h $REDIS_HOST -p $REDIS_PORT -a $REDIS_AUTH LASTSAVE) -eq $(redis-cli -h $REDIS_HOST -p $REDIS_PORT -a $REDIS_AUTH LASTSAVE) ]; do
    sleep 1
done

# 2. RDB 파일 복사
cp /var/lib/redis/dump.rdb $BACKUP_DIR/dump_$DATE.rdb

# 3. AOF 파일 백업 (활성화된 경우)
if [ -f /var/lib/redis/appendonly.aof ]; then
    cp /var/lib/redis/appendonly.aof $BACKUP_DIR/appendonly_$DATE.aof
fi

# 4. 설정 파일 백업
cp /etc/redis/redis.conf $BACKUP_DIR/redis_conf_$DATE.conf

# 5. 압축
tar -czf $BACKUP_DIR/redis_backup_$DATE.tar.gz \
    $BACKUP_DIR/dump_$DATE.rdb \
    $BACKUP_DIR/appendonly_$DATE.aof \
    $BACKUP_DIR/redis_conf_$DATE.conf

# 6. 개별 파일 삭제
rm -f $BACKUP_DIR/dump_$DATE.rdb \
      $BACKUP_DIR/appendonly_$DATE.aof \
      $BACKUP_DIR/redis_conf_$DATE.conf

# 7. 오래된 백업 삭제
find $BACKUP_DIR -name "redis_backup_*.tar.gz" -mtime +$RETENTION_DAYS -delete

# 8. 백업 상태 알림
if [ $? -eq 0 ]; then
    echo "✅ Redis backup completed successfully: redis_backup_$DATE.tar.gz"
    # Slack 알림 (선택사항)
    # curl -X POST -H 'Content-type: application/json' \
    #     --data '{"text":"Redis backup completed: redis_backup_'$DATE'.tar.gz"}' \
    #     $SLACK_WEBHOOK_URL
else
    echo "❌ Redis backup failed"
    exit 1
fi

복구 절차

#!/bin/bash
# restore-redis.sh

BACKUP_FILE=$1
REDIS_DATA_DIR="/var/lib/redis"
REDIS_USER="redis"

if [ -z "$BACKUP_FILE" ]; then
    echo "Usage: $0 <backup_file>"
    exit 1
fi

echo "Starting Redis restore process..."

# 1. Redis 서비스 중지
sudo systemctl stop redis-server

# 2. 기존 데이터 백업
sudo cp $REDIS_DATA_DIR/dump.rdb $REDIS_DATA_DIR/dump.rdb.backup.$(date +%Y%m%d_%H%M%S)

# 3. 백업 파일 압축 해제
TEMP_DIR=$(mktemp -d)
tar -xzf $BACKUP_FILE -C $TEMP_DIR

# 4. RDB 파일 복원
sudo cp $TEMP_DIR/dump_*.rdb $REDIS_DATA_DIR/dump.rdb
sudo chown $REDIS_USER:$REDIS_USER $REDIS_DATA_DIR/dump.rdb

# 5. AOF 파일 복원 (있는 경우)
if [ -f $TEMP_DIR/appendonly_*.aof ]; then
    sudo cp $TEMP_DIR/appendonly_*.aof $REDIS_DATA_DIR/appendonly.aof
    sudo chown $REDIS_USER:$REDIS_USER $REDIS_DATA_DIR/appendonly.aof
fi

# 6. 임시 디렉토리 정리
rm -rf $TEMP_DIR

# 7. Redis 서비스 시작
sudo systemctl start redis-server

# 8. 복원 확인
sleep 5
if redis-cli ping | grep -q PONG; then
    echo "✅ Redis restore completed successfully"
    redis-cli info keyspace
else
    echo "❌ Redis restore failed"
    exit 1
fi

성능 튜닝 체크리스트

# 1. 메모리 설정 확인
CONFIG GET maxmemory*
INFO memory

# 2. 네트워크 최적화
# /etc/sysctl.conf
net.core.somaxconn = 65535
net.ipv4.tcp_max_syn_backlog = 65535
vm.overcommit_memory = 1

# 3. 파일 디스크립터 한계 증가
# /etc/security/limits.conf
redis soft nofile 65535
redis hard nofile 65535

# 4. THP (Transparent Huge Pages) 비활성화
echo never > /sys/kernel/mm/transparent_hugepage/enabled

# 5. 스왑 최소화
# /etc/sysctl.conf
vm.swappiness = 1

# 6. 백그라운드 저장 최적화
# redis.conf
save 900 1
save 300 10
save 60 10000

stop-writes-on-bgsave-error no  # 백업 실패 시에도 쓰기 허용 (선택적)

모니터링 스크립트

#!/usr/bin/env python3
# redis-health-check.py

import redis
import time
import json
import sys
from datetime import datetime

class RedisHealthChecker:
    def __init__(self, host='localhost', port=6379, password=None):
        self.redis_client = redis.Redis(
            host=host, 
            port=port, 
            password=password,
            socket_timeout=5,
            socket_connect_timeout=5
        )

    def check_connectivity(self):
        """연결 상태 확인"""
        try:
            response = self.redis_client.ping()
            return {'status': 'OK', 'response_time': self.measure_latency()}
        except Exception as e:
            return {'status': 'FAIL', 'error': str(e)}

    def measure_latency(self):
        """응답 시간 측정"""
        start_time = time.time()
        self.redis_client.ping()
        return round((time.time() - start_time) * 1000, 2)  # ms

    def check_memory_usage(self):
        """메모리 사용량 확인"""
        info = self.redis_client.info('memory')

        used_memory = info['used_memory']
        max_memory = info.get('maxmemory', 0)

        usage_percent = (used_memory / max_memory * 100) if max_memory > 0 else 0

        return {
            'used_memory_mb': round(used_memory / 1024 / 1024, 2),
            'max_memory_mb': round(max_memory / 1024 / 1024, 2),
            'usage_percent': round(usage_percent, 2),
            'fragmentation_ratio': info.get('mem_fragmentation_ratio', 0)
        }

    def check_performance_metrics(self):
        """성능 메트릭 확인"""
        info = self.redis_client.info('stats')

        return {
            'total_commands_processed': info.get('total_commands_processed', 0),
            'total_connections_received': info.get('total_connections_received', 0),
            'rejected_connections': info.get('rejected_connections', 0),
            'keyspace_hits': info.get('keyspace_hits', 0),
            'keyspace_misses': info.get('keyspace_misses', 0),
            'connected_clients': info.get('connected_clients', 0),
        }

    def check_replication_status(self):
        """복제 상태 확인"""
        info = self.redis_client.info('replication')

        role = info.get('role', 'unknown')

        if role == 'master':
            return {
                'role': role,
                'connected_slaves': info.get('connected_slaves', 0),
                'replication_lag': 0
            }
        elif role == 'slave':
            return {
                'role': role,
                'master_host': info.get('master_host'),
                'master_port': info.get('master_port'),
                'master_link_status': info.get('master_link_status'),
                'master_last_io_seconds_ago': info.get('master_last_io_seconds_ago', 0)
            }

        return {'role': role}

    def run_full_health_check(self):
        """전체 건강 상태 점검"""
        health_report = {
            'timestamp': datetime.now().isoformat(),
            'connectivity': self.check_connectivity(),
            'memory': self.check_memory_usage(),
            'performance': self.check_performance_metrics(),
            'replication': self.check_replication_status()
        }

        # 전체 상태 판단
        overall_status = 'HEALTHY'

        if health_report['connectivity']['status'] != 'OK':
            overall_status = 'CRITICAL'
        elif health_report['memory']['usage_percent'] > 90:
            overall_status = 'WARNING'
        elif health_report['connectivity']['response_time'] > 100:  # 100ms 이상
            overall_status = 'WARNING'

        health_report['overall_status'] = overall_status

        return health_report

def main():
    if len(sys.argv) < 2:
        print("Usage: python3 redis-health-check.py <host> [port] [password]")
        sys.exit(1)

    host = sys.argv[1]
    port = int(sys.argv[2]) if len(sys.argv) > 2 else 6379
    password = sys.argv[3] if len(sys.argv) > 3 else None

    checker = RedisHealthChecker(host, port, password)

    try:
        health_report = checker.run_full_health_check()
        print(json.dumps(health_report, indent=2))

        # 상태에 따른 종료 코드 반환
        exit_code = 0
        if health_report['overall_status'] == 'WARNING':
            exit_code = 1
        elif health_report['overall_status'] == 'CRITICAL':
            exit_code = 2

        sys.exit(exit_code)

    except Exception as e:
        error_report = {
            'timestamp': datetime.now().isoformat(),
            'overall_status': 'CRITICAL',
            'error': str(e)
        }
        print(json.dumps(error_report, indent=2))
        sys.exit(2)

if __name__ == '__main__':
    main()

Docker Compose 운영 환경

# docker-compose.prod.yml
version: '3.8'

services:
  redis-master:
    image: redis:7-alpine
    container_name: redis-master
    command: redis-server /usr/local/etc/redis/redis.conf
    volumes:
      - ./redis-master.conf:/usr/local/etc/redis/redis.conf
      - redis-master-data:/data
    ports:
      - "6379:6379"
    networks:
      - redis-network
    deploy:
      resources:
        limits:
          memory: 2G
          cpus: '1'
        reservations:
          memory: 1G
          cpus: '0.5'
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 60s

  redis-slave:
    image: redis:7-alpine
    container_name: redis-slave
    command: redis-server /usr/local/etc/redis/redis.conf
    volumes:
      - ./redis-slave.conf:/usr/local/etc/redis/redis.conf
      - redis-slave-data:/data
    depends_on:
      - redis-master
    networks:
      - redis-network
    deploy:
      resources:
        limits:
          memory: 2G
          cpus: '1'

  redis-sentinel-1:
    image: redis:7-alpine
    container_name: redis-sentinel-1
    command: redis-sentinel /usr/local/etc/redis/sentinel.conf
    volumes:
      - ./sentinel.conf:/usr/local/etc/redis/sentinel.conf
    ports:
      - "26379:26379"
    depends_on:
      - redis-master
      - redis-slave
    networks:
      - redis-network

  redis-sentinel-2:
    image: redis:7-alpine
    container_name: redis-sentinel-2
    command: redis-sentinel /usr/local/etc/redis/sentinel.conf
    volumes:
      - ./sentinel.conf:/usr/local/etc/redis/sentinel.conf
    ports:
      - "26380:26379"
    depends_on:
      - redis-master
      - redis-slave
    networks:
      - redis-network

  redis-sentinel-3:
    image: redis:7-alpine
    container_name: redis-sentinel-3
    command: redis-sentinel /usr/local/etc/redis/sentinel.conf
    volumes:
      - ./sentinel.conf:/usr/local/etc/redis/sentinel.conf
    ports:
      - "26381:26379"
    depends_on:
      - redis-master
      - redis-slave
    networks:
      - redis-network

  redis-exporter:
    image: oliver006/redis_exporter:latest
    container_name: redis-exporter
    environment:
      REDIS_ADDR: "redis-master:6379"
      REDIS_PASSWORD: "${REDIS_PASSWORD}"
    ports:
      - "9121:9121"
    depends_on:
      - redis-master
    networks:
      - redis-network

volumes:
  redis-master-data:
  redis-slave-data:

networks:
  redis-network:
    driver: bridge

결론

Redis는 현대 애플리케이션에서 필수적인 인프라 구성 요소입니다. 단순한 캐시에서 시작하여 실시간 애플리케이션, 세션 저장소, 메시지 브로커까지 다양한 용도로 활용할 수 있습니다.

Redis 마스터를 위한 핵심 포인트

  1. 적절한 사용 사례 선택: Redis의 강점을 활용할 수 있는 영역 식별
  2. 메모리 관리: 효율적인 데이터 구조와 만료 정책 설정
  3. 고가용성 구성: 센티널이나 클러스터를 통한 안정성 확보
  4. 성능 모니터링: 지속적인 성능 측정과 최적화
  5. 보안 설정: 인증, 네트워크 제한, 암호화 적용

실무 적용 로드맵

  1. 기초 학습: 데이터 타입과 기본 명령어 숙달
  2. 캐싱 구현: 웹 애플리케이션에 캐시 레이어 추가
  3. 고급 기능: Pub/Sub, Lua 스크립팅, 모듈 활용
  4. 운영 환경: 클러스터, 센티널, 모니터링 구축
  5. 최적화: 성능 튜닝과 문제 해결

지속적 학습 리소스

마지막 조언

  • 실습 중심: 이론보다 실제 사용 경험이 중요
  • 모니터링 필수: 성능과 안정성을 지속적으로 관찰
  • 보안 우선: 운영 환경에서는 반드시 보안 설정 적용
  • 점진적 적용: 작은 규모부터 시작하여 단계적 확장
  • 백업 전략: 데이터 손실에 대비한 철저한 백업 계획

Redis를 마스터하고 고성능 애플리케이션의 핵심 인프라를 구축해보세요! 🚀