La mayoría de los desarrolladores usa Redis como un caché glorificado: SET key value EX 300 y listo. Es válido, pero Redis es una navaja suiza que la mayoría usa como martillo. Estos son los cinco patrones que más valor me han dado en producción.
Patrón 1: Rate Limiting con Sliding Window
El rate limiting con contadores simples (INCR + EXPIRE) tiene un problema: si reseteas el contador cada minuto exacto, alguien puede hacer 100 requests al final del minuto 1 y 100 más al inicio del minuto 2, violando el límite en una ventana de 2 segundos.
La ventana deslizante con Sorted Sets resuelve esto:
import time
import redis
r = redis.Redis(host='localhost', decode_responses=True)
def is_rate_limited(user_id: str, limit: int = 100, window_seconds: int = 60) -> bool:
key = f"ratelimit:{user_id}"
now = time.time()
window_start = now - window_seconds
pipe = r.pipeline()
# Elimina requests fuera de la ventana
pipe.zremrangebyscore(key, 0, window_start)
# Agrega el request actual
pipe.zadd(key, {str(now): now})
# Cuenta los requests en la ventana
pipe.zcard(key)
# Expira la key para no acumular basura
pipe.expire(key, window_seconds + 1)
results = pipe.execute()
request_count = results[2]
return request_count > limit
El pipeline es clave: ejecuta los 4 comandos en una sola round-trip a Redis. Sin él, hay race conditions.
Patrón 2: Distributed Lock (Redlock simplificado)
¿Tienes un proceso que solo debe correr una instancia a la vez en múltiples servidores? (cron jobs, procesadores de cola, migraciones):
import uuid
import redis
r = redis.Redis(host='localhost', decode_responses=True)
class DistributedLock:
def __init__(self, name: str, ttl_seconds: int = 30):
self.key = f"lock:{name}"
self.token = str(uuid.uuid4())
self.ttl = ttl_seconds
def acquire(self) -> bool:
# SET NX: solo setea si la key NO existe (atómico)
return r.set(self.key, self.token, nx=True, ex=self.ttl) is not None
def release(self) -> bool:
# Solo libera si el token es el nuestro (Lua script para atomicidad)
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
return bool(r.eval(script, 1, self.key, self.token))
def __enter__(self):
if not self.acquire():
raise RuntimeError(f"No se pudo adquirir el lock: {self.key}")
return self
def __exit__(self, *args):
self.release()
# Uso
with DistributedLock("daily_report_generation", ttl_seconds=300):
generar_reporte_diario() # Solo un servidor lo ejecuta
El script Lua garantiza que el check y el delete sean atómicos. Sin eso, hay una race condition entre verificar el token y eliminarlo.
Patrón 3: Leaderboard en tiempo real con Sorted Sets
Los Sorted Sets de Redis son una estructura ordenada por score con operaciones O(log N). Perfectos para rankings:
def actualizar_score(user_id: str, puntos: int):
r.zincrby("leaderboard:global", puntos, user_id)
def obtener_top(n: int = 10) -> list[dict]:
# ZREVRANGE: orden descendente (mayor score primero)
top = r.zrevrange("leaderboard:global", 0, n-1, withscores=True)
return [
{"user_id": uid, "score": int(score), "rank": i + 1}
for i, (uid, score) in enumerate(top)
]
def obtener_posicion(user_id: str) -> dict:
rank = r.zrevrank("leaderboard:global", user_id)
score = r.zscore("leaderboard:global", user_id)
return {
"rank": rank + 1 if rank is not None else None,
"score": int(score) if score else 0
}
Actualizar y consultar un leaderboard de 10 millones de usuarios tarda microsegundos. En PostgreSQL, esa misma query con ORDER BY score DESC puede tardar segundos sin índices perfectos.
Patrón 4: Pub/Sub para notificaciones en tiempo real
Cuando necesitas notificar a múltiples servicios de un evento sin que el publicador conozca a los suscriptores:
# Publicador (en el servicio de órdenes)
def orden_completada(order_id: str, user_id: str):
payload = json.dumps({
"event": "order.completed",
"order_id": order_id,
"user_id": user_id,
"timestamp": time.time()
})
r.publish("events:orders", payload)
# Suscriptor (en el servicio de notificaciones)
def escuchar_eventos():
pubsub = r.pubsub()
pubsub.subscribe("events:orders")
for message in pubsub.listen():
if message["type"] == "message":
data = json.loads(message["data"])
if data["event"] == "order.completed":
enviar_email_confirmacion(data["user_id"], data["order_id"])
Para casos donde necesitas garantía de entrega (que el mensaje no se pierda si el suscriptor cae), usa Redis Streams en lugar de Pub/Sub.
Patrón 5: Cache-Aside con invalidación inteligente
El patrón cache-aside estándar tiene una trampa: si múltiples requests llegan cuando el caché está vacío, todos van a la DB simultáneamente (cache stampede):
async def get_user_profile(user_id: str) -> dict:
cache_key = f"user:profile:{user_id}"
# Intenta el caché primero
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# Lock para evitar cache stampede
lock_key = f"lock:rebuilding:{cache_key}"
is_rebuilding = r.set(lock_key, "1", nx=True, ex=5)
if not is_rebuilding:
# Otro proceso está reconstruyendo, espera brevemente
time.sleep(0.1)
cached = r.get(cache_key)
return json.loads(cached) if cached else await get_from_db(user_id)
# Solo un proceso llega aquí
try:
data = await get_from_db(user_id)
r.setex(cache_key, 300, json.dumps(data))
return data
finally:
r.delete(lock_key)
Con este patrón, solo una instancia reconstruye el caché mientras las demás esperan brevemente. Elimina los picos de carga en DB durante expiraciones masivas.
Redis no es magia, pero casi
La velocidad de Redis viene de ser single-threaded y operar completamente en memoria. Eso significa que un comando lento (como KEYS * en producción) bloquea todo. Usa SCAN en su lugar. Y nunca almacenes en Redis lo que no puedes permitirte perder: es caché, no fuente de verdad.
Dominar Redis es una de las inversiones técnicas con mayor ROI que he hecho.