Todos los artículos
ARTÍCULO

Redis más allá del caché: patrones que uso en sistemas reales

Rate limiting, pub/sub, leaderboards en tiempo real, distributed locks. Redis resuelve problemas que ninguna base de datos relacional maneja bien. Aquí los patrones concretos.

Redis más allá del caché: patrones que uso en sistemas reales

La mayoría de los desarrolladores usa Redis como un caché glorificado: SET key value EX 300 y listo. Es válido, pero Redis es una navaja suiza que la mayoría usa como martillo. Estos son los cinco patrones que más valor me han dado en producción.

Patrón 1: Rate Limiting con Sliding Window

El rate limiting con contadores simples (INCR + EXPIRE) tiene un problema: si reseteas el contador cada minuto exacto, alguien puede hacer 100 requests al final del minuto 1 y 100 más al inicio del minuto 2, violando el límite en una ventana de 2 segundos.

La ventana deslizante con Sorted Sets resuelve esto:

import time
import redis

r = redis.Redis(host='localhost', decode_responses=True)

def is_rate_limited(user_id: str, limit: int = 100, window_seconds: int = 60) -> bool:
    key = f"ratelimit:{user_id}"
    now = time.time()
    window_start = now - window_seconds

    pipe = r.pipeline()
    # Elimina requests fuera de la ventana
    pipe.zremrangebyscore(key, 0, window_start)
    # Agrega el request actual
    pipe.zadd(key, {str(now): now})
    # Cuenta los requests en la ventana
    pipe.zcard(key)
    # Expira la key para no acumular basura
    pipe.expire(key, window_seconds + 1)

    results = pipe.execute()
    request_count = results[2]

    return request_count > limit

El pipeline es clave: ejecuta los 4 comandos en una sola round-trip a Redis. Sin él, hay race conditions.

Patrón 2: Distributed Lock (Redlock simplificado)

¿Tienes un proceso que solo debe correr una instancia a la vez en múltiples servidores? (cron jobs, procesadores de cola, migraciones):

import uuid
import redis

r = redis.Redis(host='localhost', decode_responses=True)

class DistributedLock:
    def __init__(self, name: str, ttl_seconds: int = 30):
        self.key = f"lock:{name}"
        self.token = str(uuid.uuid4())
        self.ttl = ttl_seconds

    def acquire(self) -> bool:
        # SET NX: solo setea si la key NO existe (atómico)
        return r.set(self.key, self.token, nx=True, ex=self.ttl) is not None

    def release(self) -> bool:
        # Solo libera si el token es el nuestro (Lua script para atomicidad)
        script = """
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        else
            return 0
        end
        """
        return bool(r.eval(script, 1, self.key, self.token))

    def __enter__(self):
        if not self.acquire():
            raise RuntimeError(f"No se pudo adquirir el lock: {self.key}")
        return self

    def __exit__(self, *args):
        self.release()

# Uso
with DistributedLock("daily_report_generation", ttl_seconds=300):
    generar_reporte_diario()  # Solo un servidor lo ejecuta

El script Lua garantiza que el check y el delete sean atómicos. Sin eso, hay una race condition entre verificar el token y eliminarlo.

Patrón 3: Leaderboard en tiempo real con Sorted Sets

Los Sorted Sets de Redis son una estructura ordenada por score con operaciones O(log N). Perfectos para rankings:

def actualizar_score(user_id: str, puntos: int):
    r.zincrby("leaderboard:global", puntos, user_id)

def obtener_top(n: int = 10) -> list[dict]:
    # ZREVRANGE: orden descendente (mayor score primero)
    top = r.zrevrange("leaderboard:global", 0, n-1, withscores=True)
    return [
        {"user_id": uid, "score": int(score), "rank": i + 1}
        for i, (uid, score) in enumerate(top)
    ]

def obtener_posicion(user_id: str) -> dict:
    rank = r.zrevrank("leaderboard:global", user_id)
    score = r.zscore("leaderboard:global", user_id)
    return {
        "rank": rank + 1 if rank is not None else None,
        "score": int(score) if score else 0
    }

Actualizar y consultar un leaderboard de 10 millones de usuarios tarda microsegundos. En PostgreSQL, esa misma query con ORDER BY score DESC puede tardar segundos sin índices perfectos.

Patrón 4: Pub/Sub para notificaciones en tiempo real

Cuando necesitas notificar a múltiples servicios de un evento sin que el publicador conozca a los suscriptores:

# Publicador (en el servicio de órdenes)
def orden_completada(order_id: str, user_id: str):
    payload = json.dumps({
        "event": "order.completed",
        "order_id": order_id,
        "user_id": user_id,
        "timestamp": time.time()
    })
    r.publish("events:orders", payload)

# Suscriptor (en el servicio de notificaciones)
def escuchar_eventos():
    pubsub = r.pubsub()
    pubsub.subscribe("events:orders")

    for message in pubsub.listen():
        if message["type"] == "message":
            data = json.loads(message["data"])
            if data["event"] == "order.completed":
                enviar_email_confirmacion(data["user_id"], data["order_id"])

Para casos donde necesitas garantía de entrega (que el mensaje no se pierda si el suscriptor cae), usa Redis Streams en lugar de Pub/Sub.

Patrón 5: Cache-Aside con invalidación inteligente

El patrón cache-aside estándar tiene una trampa: si múltiples requests llegan cuando el caché está vacío, todos van a la DB simultáneamente (cache stampede):

async def get_user_profile(user_id: str) -> dict:
    cache_key = f"user:profile:{user_id}"

    # Intenta el caché primero
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)

    # Lock para evitar cache stampede
    lock_key = f"lock:rebuilding:{cache_key}"
    is_rebuilding = r.set(lock_key, "1", nx=True, ex=5)

    if not is_rebuilding:
        # Otro proceso está reconstruyendo, espera brevemente
        time.sleep(0.1)
        cached = r.get(cache_key)
        return json.loads(cached) if cached else await get_from_db(user_id)

    # Solo un proceso llega aquí
    try:
        data = await get_from_db(user_id)
        r.setex(cache_key, 300, json.dumps(data))
        return data
    finally:
        r.delete(lock_key)

Con este patrón, solo una instancia reconstruye el caché mientras las demás esperan brevemente. Elimina los picos de carga en DB durante expiraciones masivas.

Redis no es magia, pero casi

La velocidad de Redis viene de ser single-threaded y operar completamente en memoria. Eso significa que un comando lento (como KEYS * en producción) bloquea todo. Usa SCAN en su lugar. Y nunca almacenes en Redis lo que no puedes permitirte perder: es caché, no fuente de verdad.

Dominar Redis es una de las inversiones técnicas con mayor ROI que he hecho.