Files
everything-claude-code/docs/ja-JP/skills/redis-patterns/SKILL.md
Claude ec9ace9c54 docs: add native Japanese translation of ECC documentation (ja-JP)
Translate everything-claude-code repository to Japanese including:
- 17 root documentation files
- 60 agent documentation files
- 80 command documentation files
- 99 rule files across 18 language directories (common, angular, arkts, cpp, csharp, dart, fsharp, golang, java, kotlin, perl, php, python, ruby, rust, swift, typescript, web)
- 199 skill documentation files

Total: 455 files translated to Japanese with:
- Consistent terminology glossary applied throughout
- YAML field names preserved in English (name, description, etc.)
- Code blocks and examples untouched (comments translated)
- Markdown structure and relative links preserved
- Professional translation maintaining technical accuracy

This translation expands ECC accessibility to Japanese-speaking developers and teams.

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-05-17 02:31:40 -04:00

404 lines
12 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
---
name: redis-patterns
description: Redisデータ構造パターン、キャッシング戦略、分散ロック、レート制限、Pub/Sub、本番アプリケーション用コネクション管理。
origin: ECC
---
# Redis Patterns
一般的なバックエンド使用例に対するRedisベストプラクティスの参考資料。
## How It Works
Redisはメモリ内データ構造ストアで、文字列、ハッシュ、リスト、セット、ソート済みセット、ストリームなどをサポートします。単一インスタンスでは個々のRedisコマンドは原子的ですが、マルチステップワークフローはLuaスクリプト、MULTI/EXECトランザクション、または明示的な同期化が必要です。RDBスナップショットまたはAOFログを通じてデータをオプションで永続化します。クライアントはRESPプロトコルを使用してTCP経由で通信します。接続プール不可欠でリクエストごとのハンドシェイクオーバーヘッドを回避します。
## When to Activate
- アプリケーションにキャッシング追加
- レート制限またはスロットリング実装
- 分散ロックまたはコーディネーション構築
- セッションまたはトークンストレージ設定
- Pub/SubまたはRedis Streams for messaging使用
- 本番環境でRedis設定プール、削除、クラスタリング
## Data Structure Cheat Sheet
| Use Case | Structure | Example Key |
|----------|-----------|-------------|
| Simple cache | String | `product:123` |
| User session | Hash | `session:abc` |
| Leaderboard | Sorted Set | `scores:weekly` |
| Unique visitors | Set | `visitors:2024-01-01` |
| Activity feed | List | `feed:user:456` |
| Event stream | Stream | `events:orders` |
| Counters / rate limits | String (INCR) | `ratelimit:user:123` |
| Bloom filter / HLL | HyperLogLog | `hll:pageviews` |
## Core Patterns
### Cache-Aside (Lazy Loading)
```python
import redis
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_product(product_id: int):
cache_key = f"product:{product_id}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
product = db.query("SELECT * FROM products WHERE id = %s", product_id)
r.setex(cache_key, 3600, json.dumps(product)) # TTL: 1 hour
return product
```
### Write-Through Cache
```python
def update_product(product_id: int, data: dict):
# DB書き込み先
db.execute("UPDATE products SET ... WHERE id = %s", product_id)
# キャッシュを即座に更新
cache_key = f"product:{product_id}"
r.setex(cache_key, 3600, json.dumps(data))
```
### Cache Invalidation
```python
# タグベース削除 — セット内で関連キーをグループ化
def cache_product(product_id: int, category_id: int, data: dict):
key = f"product:{product_id}"
tag = f"tag:category:{category_id}"
pipe = r.pipeline(transaction=True)
pipe.setex(key, 3600, json.dumps(data))
pipe.sadd(tag, key)
pipe.expire(tag, 3600)
pipe.execute()
def invalidate_category(category_id: int):
tag = f"tag:category:{category_id}"
keys = r.smembers(tag)
if keys:
r.delete(*keys)
r.delete(tag)
```
### Session Storage
```python
import time
import uuid
def create_session(user_id: int, ttl: int = 86400) -> str:
session_id = str(uuid.uuid4())
key = f"session:{session_id}"
pipe = r.pipeline(transaction=True)
pipe.hset(key, mapping={
"user_id": user_id,
"created_at": int(time.time()),
})
pipe.expire(key, ttl)
pipe.execute()
return session_id
def get_session(session_id: str) -> dict | None:
data = r.hgetall(f"session:{session_id}")
return data if data else None
def delete_session(session_id: str):
r.delete(f"session:{session_id}")
```
## Rate Limiting
### Fixed Window (Simple)
```python
def is_rate_limited(user_id: int, limit: int = 100, window: int = 60) -> bool:
key = f"ratelimit:{user_id}:{int(time.time()) // window}"
pipe = r.pipeline(transaction=True)
pipe.incr(key)
pipe.expire(key, window)
count, _ = pipe.execute()
return count > limit
```
### Sliding Window (Lua — Atomic)
```lua
-- sliding_window.lua
local key = KEYS[1]
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
local count = redis.call('ZCARD', key)
if count < limit then
-- Use unique member (now + sequence) to avoid collisions within the same millisecond
local seq_key = key .. ':seq'
local seq = redis.call('INCR', seq_key)
redis.call('EXPIRE', seq_key, math.ceil(window / 1000))
redis.call('ZADD', key, now, now .. '-' .. seq)
redis.call('EXPIRE', key, math.ceil(window / 1000))
return 1
end
return 0
```
```python
sliding_window = r.register_script(open('sliding_window.lua').read())
def allow_request(user_id: int) -> bool:
key = f"ratelimit:sliding:{user_id}"
now = int(time.time() * 1000)
return bool(sliding_window(keys=[key], args=[now, 60000, 100]))
```
## Distributed Locks
### Distributed Lock (Single Node — SET NX PX)
```python
import uuid
def acquire_lock(resource: str, ttl_ms: int = 5000) -> str | None:
lock_key = f"lock:{resource}"
token = str(uuid.uuid4())
acquired = r.set(lock_key, token, px=ttl_ms, nx=True)
return token if acquired else None
def release_lock(resource: str, token: str) -> bool:
release_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
result = r.eval(release_script, 1, f"lock:{resource}", token)
return bool(result)
# Usage
token = acquire_lock("order:payment:123")
if token:
try:
process_payment()
finally:
release_lock("order:payment:123", token)
```
> マルチード設定の場合、フルRedlockアルゴリズムを実装する `redlock-py` ライブラリを使用してください。
## Pub/Sub & Streams
### Pub/Sub (Fire-and-Forget)
```python
# Publisher
def publish_event(channel: str, payload: dict):
r.publish(channel, json.dumps(payload))
# Subscriber (blocking — run in separate thread/process)
def subscribe_events(channel: str):
pubsub = r.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
handle(json.loads(message['data']))
```
### Redis Streams (Durable Queue)
```python
# Producer
def emit(stream: str, event: dict):
r.xadd(stream, event, maxlen=10000) # Cap stream length
# Consumer group — guarantees at-least-once delivery
try:
r.xgroup_create('events:orders', 'processor', id='0', mkstream=True)
except Exception:
pass # Group already exists
def consume(stream: str, group: str, consumer: str):
while True:
messages = r.xreadgroup(group, consumer, {stream: '>'}, count=10, block=2000)
for _, entries in (messages or []):
for msg_id, data in entries:
process(data)
r.xack(stream, group, msg_id)
```
> 配信保証、コンシューマーグループ、または再生が必要な場合、Pub/Sub代わりに**Streams**を優先してください。
## Key Design
### Naming Conventions
```
# Pattern: resource:id:field
user:123:profile
order:456:status
cache:product:789
# Pattern: namespace:resource:id
myapp:session:abc123
myapp:ratelimit:user:123
# Pattern: resource:date (time-bound keys)
stats:pageviews:2024-01-01
```
### TTL Strategy
| Data Type | Suggested TTL |
|-----------|--------------|
| User session | 24h (`86400`) |
| API response cache | 515 min |
| Rate limit window | Match window size |
| Short-lived tokens | 510 min |
| Leaderboard | 1h24h |
| Static/reference data | 1h1 week |
常にTTLを設定してください。TTLなしのキーは無限に蓄積してメモリ圧力を引き起こします。
## Connection Management
### Connection Pooling
```python
from redis import ConnectionPool, Redis
pool = ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=20,
decode_responses=True,
socket_connect_timeout=2,
socket_timeout=2,
)
r = Redis(connection_pool=pool)
```
### Cluster Mode
```python
from redis.cluster import RedisCluster
r = RedisCluster(
startup_nodes=[{"host": "redis-1", "port": 6379}],
decode_responses=True,
skip_full_coverage_check=True,
)
```
### Sentinel (High Availability)
```python
from redis.sentinel import Sentinel
sentinel = Sentinel(
[('sentinel-1', 26379), ('sentinel-2', 26379)],
socket_timeout=0.5,
)
master = sentinel.master_for('mymaster', decode_responses=True)
replica = sentinel.slave_for('mymaster', decode_responses=True)
```
## Eviction Policies
| Policy | Behavior | Best For |
|--------|----------|----------|
| `noeviction` | Error on write when full | Queues / critical data |
| `allkeys-lru` | Evict least recently used | General cache |
| `volatile-lru` | LRU only among keys with TTL | Mixed data store |
| `allkeys-lfu` | Evict least frequently used | Skewed access patterns |
| `volatile-ttl` | Evict soonest-to-expire | Prioritize long-lived data |
`redis.conf`を通じて設定:`maxmemory-policy allkeys-lru`
## Anti-Patterns
| Anti-Pattern | Problem | Fix |
|---|---|---|
| Keys with no TTL | Memory grows unbounded | Always set TTL |
| `KEYS *` in production | Blocks the server (O(N)) | Use `SCAN` cursor |
| Storing large blobs (>100KB) | Slow serialization, memory pressure | Store reference + fetch from object store |
| Single Redis for everything | No isolation between cache & queue | Use separate DBs or instances |
| Ignoring connection pool limits | Connection exhaustion under load | Size pool to workload |
| Not handling cache miss stampede | Thundering herd on cold start | Use locks or probabilistic early expiry |
| `FLUSHALL` without thought | Wipes entire instance | Scope deletes by key pattern |
### Cache Miss Stampede Prevention
```python
import threading
_locks: dict[str, threading.Lock] = {}
_locks_mutex = threading.Lock()
def get_with_lock(key: str, fetch_fn, ttl: int = 300):
cached = r.get(key)
if cached:
return json.loads(cached)
with _locks_mutex:
if key not in _locks:
_locks[key] = threading.Lock()
lock = _locks[key]
with lock:
cached = r.get(key) # Re-check after acquiring lock
if cached:
return json.loads(cached)
value = fetch_fn()
r.setex(key, ttl, json.dumps(value))
return value
```
> マルチプロセスデプロイメント:インプロセスロックを上記の分散ロックセクション から `acquire_lock`/`release_lock` に置き換えてください。
## Examples
**Django/Flask APIエンドポイントにキャッシング追加**
レスポンスに5分TTLでCache-asideを使用。リクエストパラメータでキーを指定。
**ユーザーごとにAPIレート制限**
低トラフィックエンドポイントに固定ウィンドウを `pipeline(transaction=True)` で使用正確なユーザーごと制限にはsliding-windowの Lua使用。
**ワーカー間のバックグラウンドジョブ調整:**
予想ジョブ期間を超えるTTLで `acquire_lock` を使用。常に `finally` ブロックでリリース。
**複数購読者への通知のファンアウト:**
ファイアアンドフォーゲットにPub/Subを使用。保証配信または再生が必要な場合、Streamsに切り替え。
## Quick Reference
| Pattern | When to Use |
|---------|-------------|
| Cache-aside | Read-heavy, tolerate slight staleness |
| Write-through | Strong consistency required |
| Distributed lock | Prevent concurrent access to a resource |
| Sliding window rate limit | Accurate per-user throttling |
| Redis Streams | Durable event queue with consumer groups |
| Pub/Sub | Broadcast with no delivery guarantees needed |
| Sorted Set leaderboard | Ranked scoring, pagination |
| HyperLogLog | Approximate unique count at low memory |
## Related
- Skill: `postgres-patterns` — リレーショナルデータパターン
- Skill: `backend-patterns` — APIおよびサービスレイヤーパターン
- Skill: `database-migrations` — スキーマバージョニング
- Skill: `django-patterns` — Djangoキャッシュフレームワーク統合
- Agent: `database-reviewer` — 全データベースレビューワークフロー