Python e Redis: Cache e Filas — 2026 | Python Brasil

Aprenda a usar Redis com Python para cache, filas e pub/sub. Tutorial completo com redis-py, cache de API, rate limiting e task queues.

6 min de leitura Equipe Python Brasil

Redis e um dos bancos de dados em memoria mais usados no mundo. Ele funciona como cache, fila de mensagens, armazenamento de sessoes e muito mais. Com Python, a integracao e simples e poderosa usando a biblioteca redis-py. Neste guia, a gente vai explorar os casos de uso mais comuns com exemplos praticos.

Instalacao e Conexao

pip install redis

Para ter o Redis rodando localmente, use Docker:

docker run -d --name redis -p 6379:6379 redis:7-alpine

Conectando com Python:

import redis

# Conexao simples
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)

# Ou via URL
r = redis.from_url("redis://localhost:6379/0", decode_responses=True)

# Testar conexao
print(r.ping())  # True
print(f"Redis versao: {r.info()['redis_version']}")

O parametro decode_responses=True faz o Redis retornar strings ao inves de bytes.

Operacoes Basicas

Redis suporta diversos tipos de dados:

import redis

r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)

# Strings
r.set("nome", "Ana Silva")
r.set("contador", 0)
print(r.get("nome"))  # Ana Silva

# Incremento atomico
r.incr("contador")  # 1
r.incr("contador")  # 2
r.incrby("contador", 10)  # 12
print(r.get("contador"))  # 12

# Com expiracao (TTL)
r.setex("token_sessao", 3600, "abc123")  # Expira em 1 hora
print(r.ttl("token_sessao"))  # Segundos restantes

# Hashes (como dicionarios)
r.hset("usuario:1", mapping={
    "nome": "Ana Silva",
    "email": "ana@email.com",
    "idade": "28"
})

usuario = r.hgetall("usuario:1")
print(usuario)
# {'nome': 'Ana Silva', 'email': 'ana@email.com', 'idade': '28'}

# Listas
r.rpush("fila", "tarefa1", "tarefa2", "tarefa3")
print(r.lrange("fila", 0, -1))  # ['tarefa1', 'tarefa2', 'tarefa3']

# Sets
r.sadd("tags", "python", "redis", "cache")
print(r.smembers("tags"))  # {'python', 'redis', 'cache'}

# Sorted Sets (com score)
r.zadd("ranking", {"ana": 95, "bruno": 87, "carla": 92})
print(r.zrevrange("ranking", 0, -1, withscores=True))
# [('ana', 95.0), ('carla', 92.0), ('bruno', 87.0)]

Cache de Funcoes

O caso de uso mais classico do Redis e cache:

import redis
import json
import hashlib
from functools import wraps

r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)


def cache_redis(ttl=300):
    """Decorador de cache usando Redis."""
    def decorador(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Gerar chave unica baseada na funcao e argumentos
            chave_base = f"{func.__module__}:{func.__name__}"
            args_str = json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True)
            hash_args = hashlib.md5(args_str.encode()).hexdigest()
            chave = f"cache:{chave_base}:{hash_args}"

            # Tentar buscar do cache
            resultado_cache = r.get(chave)
            if resultado_cache is not None:
                print(f"Cache HIT: {chave}")
                return json.loads(resultado_cache)

            # Executar funcao e cachear resultado
            print(f"Cache MISS: {chave}")
            resultado = func(*args, **kwargs)
            r.setex(chave, ttl, json.dumps(resultado))
            return resultado
        return wrapper
    return decorador


@cache_redis(ttl=600)
def buscar_dados_api(endpoint):
    """Simula busca em API externa."""
    import time
    time.sleep(2)  # Simula latencia
    return {"dados": f"Resultado de {endpoint}", "timestamp": "2025-01-28"}


# Primeira chamada: lenta (cache MISS)
resultado = buscar_dados_api("/usuarios")

# Segunda chamada: instantanea (cache HIT)
resultado = buscar_dados_api("/usuarios")

Cache em Aplicacoes Web

Integrando cache com FastAPI:

from fastapi import FastAPI, Depends
import redis
import json

app = FastAPI()
cache = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)


async def get_cache():
    return cache


@app.get("/produtos")
async def listar_produtos(r: redis.Redis = Depends(get_cache)):
    """Lista produtos com cache."""
    chave = "produtos:lista"

    # Tentar cache
    dados_cache = r.get(chave)
    if dados_cache:
        return {"fonte": "cache", "dados": json.loads(dados_cache)}

    # Buscar do banco de dados (simulado)
    produtos = [
        {"id": 1, "nome": "Notebook", "preco": 3500.00},
        {"id": 2, "nome": "Mouse", "preco": 89.90},
        {"id": 3, "nome": "Teclado", "preco": 199.90},
    ]

    # Salvar no cache por 5 minutos
    r.setex(chave, 300, json.dumps(produtos))

    return {"fonte": "banco", "dados": produtos}


@app.post("/produtos/{produto_id}/atualizar")
async def atualizar_produto(produto_id: int, r: redis.Redis = Depends(get_cache)):
    """Atualiza produto e invalida cache."""
    # ... atualizar no banco ...
    r.delete("produtos:lista")  # Invalidar cache
    return {"status": "atualizado", "cache": "invalidado"}

Rate Limiting com Redis

Redis e perfeito para controlar taxa de requisicoes:

import redis
import time


class RateLimiter:
    """Rate limiter usando Redis com sliding window."""

    def __init__(self, redis_client, limite, janela_segundos):
        self.r = redis_client
        self.limite = limite
        self.janela = janela_segundos

    def permitir(self, identificador: str) -> bool:
        """Verifica se a requisicao e permitida."""
        chave = f"rate_limit:{identificador}"
        agora = time.time()

        pipe = self.r.pipeline()
        # Remover registros fora da janela
        pipe.zremrangebyscore(chave, 0, agora - self.janela)
        # Contar registros na janela
        pipe.zcard(chave)
        # Adicionar registro atual
        pipe.zadd(chave, {str(agora): agora})
        # Definir expiracao
        pipe.expire(chave, self.janela)

        resultados = pipe.execute()
        contagem = resultados[1]

        return contagem < self.limite

    def info(self, identificador: str) -> dict:
        """Retorna informacoes sobre o rate limit."""
        chave = f"rate_limit:{identificador}"
        agora = time.time()
        self.r.zremrangebyscore(chave, 0, agora - self.janela)
        contagem = self.r.zcard(chave)
        return {
            "limite": self.limite,
            "usado": contagem,
            "restante": max(0, self.limite - contagem),
            "janela_segundos": self.janela
        }


# Uso
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
limiter = RateLimiter(r, limite=10, janela_segundos=60)

# Simular requisicoes
for i in range(12):
    ip = "192.168.1.100"
    if limiter.permitir(ip):
        print(f"Requisicao {i+1}: permitida")
    else:
        print(f"Requisicao {i+1}: bloqueada (rate limit)")

Filas de Tarefas

Redis funciona como fila de mensagens para processamento assincrono:

import redis
import json
import time
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor


class FilaTarefas:
    """Fila de tarefas simples usando Redis."""

    def __init__(self, redis_client, nome_fila="tarefas"):
        self.r = redis_client
        self.fila = nome_fila
        self.processando = f"{nome_fila}:processando"

    def adicionar(self, tipo: str, dados: dict):
        """Adiciona tarefa a fila."""
        tarefa = {
            "tipo": tipo,
            "dados": dados,
            "criado_em": datetime.now().isoformat()
        }
        self.r.rpush(self.fila, json.dumps(tarefa))
        print(f"Tarefa adicionada: {tipo}")

    def processar(self, timeout=5):
        """Processa proxima tarefa da fila (bloqueante)."""
        resultado = self.r.blpop(self.fila, timeout=timeout)
        if resultado:
            _, tarefa_json = resultado
            return json.loads(tarefa_json)
        return None

    def tamanho(self):
        return self.r.llen(self.fila)


# Produtor
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
fila = FilaTarefas(r)

fila.adicionar("enviar_email", {
    "para": "ana@email.com",
    "assunto": "Bem-vinda!",
    "corpo": "Obrigado por se cadastrar."
})

fila.adicionar("processar_imagem", {
    "caminho": "/uploads/foto.jpg",
    "operacao": "redimensionar",
    "largura": 800
})

# Consumidor (worker)
def worker(fila):
    """Processa tarefas continuamente."""
    print("Worker iniciado. Aguardando tarefas...")
    while True:
        tarefa = fila.processar(timeout=5)
        if tarefa:
            print(f"Processando: {tarefa['tipo']}")
            # Executar tarefa baseado no tipo
            if tarefa["tipo"] == "enviar_email":
                print(f"  Enviando email para {tarefa['dados']['para']}")
            elif tarefa["tipo"] == "processar_imagem":
                print(f"  Processando {tarefa['dados']['caminho']}")
            time.sleep(1)  # Simula processamento

# worker(fila)  # Executar em outro terminal

Pub/Sub para Eventos

Redis Pub/Sub permite comunicacao em tempo real entre servicos:

import redis
import json
import threading

r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)


def publicar_evento(canal, evento):
    """Publica evento em um canal."""
    r.publish(canal, json.dumps(evento))


def ouvir_eventos(canais):
    """Escuta eventos de canais especificos."""
    pubsub = r.pubsub()
    pubsub.subscribe(canais)

    print(f"Ouvindo canais: {canais}")
    for mensagem in pubsub.listen():
        if mensagem["type"] == "message":
            dados = json.loads(mensagem["data"])
            print(f"[{mensagem['channel']}] {dados}")


# Em outro processo ou thread
# ouvir_eventos(["pedidos", "notificacoes"])

# Publicar eventos
publicar_evento("pedidos", {
    "tipo": "novo_pedido",
    "pedido_id": 123,
    "valor": 299.90
})

publicar_evento("notificacoes", {
    "tipo": "alerta",
    "mensagem": "Estoque baixo do produto X"
})

Sessoes de Usuario com Redis

import redis
import json
import uuid
from datetime import datetime


class GerenciadorSessoes:
    """Gerencia sessoes de usuario com Redis."""

    def __init__(self, redis_client, ttl=3600):
        self.r = redis_client
        self.ttl = ttl

    def criar(self, usuario_id: str, dados: dict) -> str:
        """Cria nova sessao."""
        session_id = str(uuid.uuid4())
        chave = f"sessao:{session_id}"

        sessao = {
            "usuario_id": usuario_id,
            "criada_em": datetime.now().isoformat(),
            **dados
        }

        self.r.setex(chave, self.ttl, json.dumps(sessao))
        return session_id

    def obter(self, session_id: str) -> dict:
        """Obtem dados da sessao."""
        chave = f"sessao:{session_id}"
        dados = self.r.get(chave)
        if dados:
            self.r.expire(chave, self.ttl)  # Renovar TTL
            return json.loads(dados)
        return None

    def destruir(self, session_id: str):
        """Destroi uma sessao."""
        self.r.delete(f"sessao:{session_id}")


# Uso
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
sessoes = GerenciadorSessoes(r, ttl=1800)

sid = sessoes.criar("usuario_123", {"nome": "Ana", "role": "admin"})
print(f"Sessao criada: {sid}")

dados = sessoes.obter(sid)
print(f"Dados: {dados}")

Boas Praticas

Ao usar Redis com Python, siga estas recomendacoes:

  • Use decode_responses=True para trabalhar com strings ao inves de bytes
  • Defina TTL em todas as chaves para evitar acumulo de dados
  • Use pipelines para operacoes em lote, reduzindo round-trips
  • Estruture as chaves com prefixos claros como cache:, sessao:, fila:
  • Implemente connection pooling para aplicacoes com muitas conexoes
  • Monitore uso de memoria do Redis regularmente
  • Use Lua scripts para operacoes atomicas complexas
  • Configure persistencia (RDB ou AOF) conforme a criticidade dos dados

Conclusao

Redis e uma ferramenta versatil que resolve problemas comuns de performance e arquitetura em aplicacoes Python. Do cache simples de funcoes ate filas de tarefas e pub/sub para eventos, as possibilidades sao amplas. Comece implementando cache nas consultas mais lentas da sua aplicacao e explore funcionalidades mais avancadas conforme a necessidade. O ganho de performance e imediato e o impacto positivo na experiencia do usuario e significativo.

E

Equipe Python Brasil

Contribuidor do Python Brasil — Aprenda Python em Português