--- name: redis-patterns description: Redisデータ構造パターン、キャッシング戦略、分散ロック、レート制限、Pub/Sub、本番アプリケーション用コネクション管理。 origin: ECC --- # Redis Patterns 一般的なバックエンド使用例に対するRedisベストプラクティスの参考資料。 ## How It Works Redisはメモリ内データ構造ストアで、文字列、ハッシュ、リスト、セット、ソート済みセット、ストリームなどをサポートします。単一インスタンスでは個々のRedisコマンドは原子的ですが、マルチステップワークフローはLuaスクリプト、MULTI/EXECトランザクション、または明示的な同期化が必要です。RDBスナップショットまたはAOFログを通じてデータをオプションで永続化します。クライアントはRESPプロトコルを使用してTCP経由で通信します。接続プール不可欠でリクエストごとのハンドシェイクオーバーヘッドを回避します。 ## When to Activate - アプリケーションにキャッシング追加 - レート制限またはスロットリング実装 - 分散ロックまたはコーディネーション構築 - セッションまたはトークンストレージ設定 - Pub/SubまたはRedis Streams for messaging使用 - 本番環境でRedis設定(プール、削除、クラスタリング) ## Data Structure Cheat Sheet | Use Case | Structure | Example Key | |----------|-----------|-------------| | Simple cache | String | `product:123` | | User session | Hash | `session:abc` | | Leaderboard | Sorted Set | `scores:weekly` | | Unique visitors | Set | `visitors:2024-01-01` | | Activity feed | List | `feed:user:456` | | Event stream | Stream | `events:orders` | | Counters / rate limits | String (INCR) | `ratelimit:user:123` | | Bloom filter / HLL | HyperLogLog | `hll:pageviews` | ## Core Patterns ### Cache-Aside (Lazy Loading) ```python import redis import json r = redis.Redis(host='localhost', port=6379, decode_responses=True) def get_product(product_id: int): cache_key = f"product:{product_id}" cached = r.get(cache_key) if cached: return json.loads(cached) product = db.query("SELECT * FROM products WHERE id = %s", product_id) r.setex(cache_key, 3600, json.dumps(product)) # TTL: 1 hour return product ``` ### Write-Through Cache ```python def update_product(product_id: int, data: dict): # DB書き込み先 db.execute("UPDATE products SET ... WHERE id = %s", product_id) # キャッシュを即座に更新 cache_key = f"product:{product_id}" r.setex(cache_key, 3600, json.dumps(data)) ``` ### Cache Invalidation ```python # タグベース削除 — セット内で関連キーをグループ化 def cache_product(product_id: int, category_id: int, data: dict): key = f"product:{product_id}" tag = f"tag:category:{category_id}" pipe = r.pipeline(transaction=True) pipe.setex(key, 3600, json.dumps(data)) pipe.sadd(tag, key) pipe.expire(tag, 3600) pipe.execute() def invalidate_category(category_id: int): tag = f"tag:category:{category_id}" keys = r.smembers(tag) if keys: r.delete(*keys) r.delete(tag) ``` ### Session Storage ```python import time import uuid def create_session(user_id: int, ttl: int = 86400) -> str: session_id = str(uuid.uuid4()) key = f"session:{session_id}" pipe = r.pipeline(transaction=True) pipe.hset(key, mapping={ "user_id": user_id, "created_at": int(time.time()), }) pipe.expire(key, ttl) pipe.execute() return session_id def get_session(session_id: str) -> dict | None: data = r.hgetall(f"session:{session_id}") return data if data else None def delete_session(session_id: str): r.delete(f"session:{session_id}") ``` ## Rate Limiting ### Fixed Window (Simple) ```python def is_rate_limited(user_id: int, limit: int = 100, window: int = 60) -> bool: key = f"ratelimit:{user_id}:{int(time.time()) // window}" pipe = r.pipeline(transaction=True) pipe.incr(key) pipe.expire(key, window) count, _ = pipe.execute() return count > limit ``` ### Sliding Window (Lua — Atomic) ```lua -- sliding_window.lua local key = KEYS[1] local now = tonumber(ARGV[1]) local window = tonumber(ARGV[2]) local limit = tonumber(ARGV[3]) redis.call('ZREMRANGEBYSCORE', key, 0, now - window) local count = redis.call('ZCARD', key) if count < limit then -- Use unique member (now + sequence) to avoid collisions within the same millisecond local seq_key = key .. ':seq' local seq = redis.call('INCR', seq_key) redis.call('EXPIRE', seq_key, math.ceil(window / 1000)) redis.call('ZADD', key, now, now .. '-' .. seq) redis.call('EXPIRE', key, math.ceil(window / 1000)) return 1 end return 0 ``` ```python sliding_window = r.register_script(open('sliding_window.lua').read()) def allow_request(user_id: int) -> bool: key = f"ratelimit:sliding:{user_id}" now = int(time.time() * 1000) return bool(sliding_window(keys=[key], args=[now, 60000, 100])) ``` ## Distributed Locks ### Distributed Lock (Single Node — SET NX PX) ```python import uuid def acquire_lock(resource: str, ttl_ms: int = 5000) -> str | None: lock_key = f"lock:{resource}" token = str(uuid.uuid4()) acquired = r.set(lock_key, token, px=ttl_ms, nx=True) return token if acquired else None def release_lock(resource: str, token: str) -> bool: release_script = """ if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end """ result = r.eval(release_script, 1, f"lock:{resource}", token) return bool(result) # Usage token = acquire_lock("order:payment:123") if token: try: process_payment() finally: release_lock("order:payment:123", token) ``` > マルチノード設定の場合、フルRedlockアルゴリズムを実装する `redlock-py` ライブラリを使用してください。 ## Pub/Sub & Streams ### Pub/Sub (Fire-and-Forget) ```python # Publisher def publish_event(channel: str, payload: dict): r.publish(channel, json.dumps(payload)) # Subscriber (blocking — run in separate thread/process) def subscribe_events(channel: str): pubsub = r.pubsub() pubsub.subscribe(channel) for message in pubsub.listen(): if message['type'] == 'message': handle(json.loads(message['data'])) ``` ### Redis Streams (Durable Queue) ```python # Producer def emit(stream: str, event: dict): r.xadd(stream, event, maxlen=10000) # Cap stream length # Consumer group — guarantees at-least-once delivery try: r.xgroup_create('events:orders', 'processor', id='0', mkstream=True) except Exception: pass # Group already exists def consume(stream: str, group: str, consumer: str): while True: messages = r.xreadgroup(group, consumer, {stream: '>'}, count=10, block=2000) for _, entries in (messages or []): for msg_id, data in entries: process(data) r.xack(stream, group, msg_id) ``` > 配信保証、コンシューマーグループ、または再生が必要な場合、Pub/Sub代わりに**Streams**を優先してください。 ## Key Design ### Naming Conventions ``` # Pattern: resource:id:field user:123:profile order:456:status cache:product:789 # Pattern: namespace:resource:id myapp:session:abc123 myapp:ratelimit:user:123 # Pattern: resource:date (time-bound keys) stats:pageviews:2024-01-01 ``` ### TTL Strategy | Data Type | Suggested TTL | |-----------|--------------| | User session | 24h (`86400`) | | API response cache | 5–15 min | | Rate limit window | Match window size | | Short-lived tokens | 5–10 min | | Leaderboard | 1h–24h | | Static/reference data | 1h–1 week | 常にTTLを設定してください。TTLなしのキーは無限に蓄積してメモリ圧力を引き起こします。 ## Connection Management ### Connection Pooling ```python from redis import ConnectionPool, Redis pool = ConnectionPool( host='localhost', port=6379, db=0, max_connections=20, decode_responses=True, socket_connect_timeout=2, socket_timeout=2, ) r = Redis(connection_pool=pool) ``` ### Cluster Mode ```python from redis.cluster import RedisCluster r = RedisCluster( startup_nodes=[{"host": "redis-1", "port": 6379}], decode_responses=True, skip_full_coverage_check=True, ) ``` ### Sentinel (High Availability) ```python from redis.sentinel import Sentinel sentinel = Sentinel( [('sentinel-1', 26379), ('sentinel-2', 26379)], socket_timeout=0.5, ) master = sentinel.master_for('mymaster', decode_responses=True) replica = sentinel.slave_for('mymaster', decode_responses=True) ``` ## Eviction Policies | Policy | Behavior | Best For | |--------|----------|----------| | `noeviction` | Error on write when full | Queues / critical data | | `allkeys-lru` | Evict least recently used | General cache | | `volatile-lru` | LRU only among keys with TTL | Mixed data store | | `allkeys-lfu` | Evict least frequently used | Skewed access patterns | | `volatile-ttl` | Evict soonest-to-expire | Prioritize long-lived data | `redis.conf`を通じて設定:`maxmemory-policy allkeys-lru` ## Anti-Patterns | Anti-Pattern | Problem | Fix | |---|---|---| | Keys with no TTL | Memory grows unbounded | Always set TTL | | `KEYS *` in production | Blocks the server (O(N)) | Use `SCAN` cursor | | Storing large blobs (>100KB) | Slow serialization, memory pressure | Store reference + fetch from object store | | Single Redis for everything | No isolation between cache & queue | Use separate DBs or instances | | Ignoring connection pool limits | Connection exhaustion under load | Size pool to workload | | Not handling cache miss stampede | Thundering herd on cold start | Use locks or probabilistic early expiry | | `FLUSHALL` without thought | Wipes entire instance | Scope deletes by key pattern | ### Cache Miss Stampede Prevention ```python import threading _locks: dict[str, threading.Lock] = {} _locks_mutex = threading.Lock() def get_with_lock(key: str, fetch_fn, ttl: int = 300): cached = r.get(key) if cached: return json.loads(cached) with _locks_mutex: if key not in _locks: _locks[key] = threading.Lock() lock = _locks[key] with lock: cached = r.get(key) # Re-check after acquiring lock if cached: return json.loads(cached) value = fetch_fn() r.setex(key, ttl, json.dumps(value)) return value ``` > マルチプロセスデプロイメント:インプロセスロックを上記の分散ロックセクション から `acquire_lock`/`release_lock` に置き換えてください。 ## Examples **Django/Flask APIエンドポイントにキャッシング追加:** レスポンスに5分TTLでCache-asideを使用。リクエストパラメータでキーを指定。 **ユーザーごとにAPIレート制限:** 低トラフィックエンドポイントに固定ウィンドウを `pipeline(transaction=True)` で使用;正確なユーザーごと制限にはsliding-windowの Lua使用。 **ワーカー間のバックグラウンドジョブ調整:** 予想ジョブ期間を超えるTTLで `acquire_lock` を使用。常に `finally` ブロックでリリース。 **複数購読者への通知のファンアウト:** ファイアアンドフォーゲットにPub/Subを使用。保証配信または再生が必要な場合、Streamsに切り替え。 ## Quick Reference | Pattern | When to Use | |---------|-------------| | Cache-aside | Read-heavy, tolerate slight staleness | | Write-through | Strong consistency required | | Distributed lock | Prevent concurrent access to a resource | | Sliding window rate limit | Accurate per-user throttling | | Redis Streams | Durable event queue with consumer groups | | Pub/Sub | Broadcast with no delivery guarantees needed | | Sorted Set leaderboard | Ranked scoring, pagination | | HyperLogLog | Approximate unique count at low memory | ## Related - Skill: `postgres-patterns` — リレーショナルデータパターン - Skill: `backend-patterns` — APIおよびサービスレイヤーパターン - Skill: `database-migrations` — スキーマバージョニング - Skill: `django-patterns` — Djangoキャッシュフレームワーク統合 - Agent: `database-reviewer` — 全データベースレビューワークフロー