Python e Redis: Cache e Filas — 2026 | Python Brasil
Aprenda a usar Redis com Python para cache, filas e pub/sub. Tutorial completo com redis-py, cache de API, rate limiting e task queues.
Redis e um dos bancos de dados em memoria mais usados no mundo. Ele funciona como cache, fila de mensagens, armazenamento de sessoes e muito mais. Com Python, a integracao e simples e poderosa usando a biblioteca redis-py. Neste guia, a gente vai explorar os casos de uso mais comuns com exemplos praticos.
Instalacao e Conexao
pip install redis
Para ter o Redis rodando localmente, use Docker:
docker run -d --name redis -p 6379:6379 redis:7-alpine
Conectando com Python:
import redis
# Conexao simples
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
# Ou via URL
r = redis.from_url("redis://localhost:6379/0", decode_responses=True)
# Testar conexao
print(r.ping()) # True
print(f"Redis versao: {r.info()['redis_version']}")
O parametro decode_responses=True faz o Redis retornar strings ao inves de bytes.
Operacoes Basicas
Redis suporta diversos tipos de dados:
import redis
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
# Strings
r.set("nome", "Ana Silva")
r.set("contador", 0)
print(r.get("nome")) # Ana Silva
# Incremento atomico
r.incr("contador") # 1
r.incr("contador") # 2
r.incrby("contador", 10) # 12
print(r.get("contador")) # 12
# Com expiracao (TTL)
r.setex("token_sessao", 3600, "abc123") # Expira em 1 hora
print(r.ttl("token_sessao")) # Segundos restantes
# Hashes (como dicionarios)
r.hset("usuario:1", mapping={
"nome": "Ana Silva",
"email": "ana@email.com",
"idade": "28"
})
usuario = r.hgetall("usuario:1")
print(usuario)
# {'nome': 'Ana Silva', 'email': 'ana@email.com', 'idade': '28'}
# Listas
r.rpush("fila", "tarefa1", "tarefa2", "tarefa3")
print(r.lrange("fila", 0, -1)) # ['tarefa1', 'tarefa2', 'tarefa3']
# Sets
r.sadd("tags", "python", "redis", "cache")
print(r.smembers("tags")) # {'python', 'redis', 'cache'}
# Sorted Sets (com score)
r.zadd("ranking", {"ana": 95, "bruno": 87, "carla": 92})
print(r.zrevrange("ranking", 0, -1, withscores=True))
# [('ana', 95.0), ('carla', 92.0), ('bruno', 87.0)]
Cache de Funcoes
O caso de uso mais classico do Redis e cache:
import redis
import json
import hashlib
from functools import wraps
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
def cache_redis(ttl=300):
"""Decorador de cache usando Redis."""
def decorador(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Gerar chave unica baseada na funcao e argumentos
chave_base = f"{func.__module__}:{func.__name__}"
args_str = json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True)
hash_args = hashlib.md5(args_str.encode()).hexdigest()
chave = f"cache:{chave_base}:{hash_args}"
# Tentar buscar do cache
resultado_cache = r.get(chave)
if resultado_cache is not None:
print(f"Cache HIT: {chave}")
return json.loads(resultado_cache)
# Executar funcao e cachear resultado
print(f"Cache MISS: {chave}")
resultado = func(*args, **kwargs)
r.setex(chave, ttl, json.dumps(resultado))
return resultado
return wrapper
return decorador
@cache_redis(ttl=600)
def buscar_dados_api(endpoint):
"""Simula busca em API externa."""
import time
time.sleep(2) # Simula latencia
return {"dados": f"Resultado de {endpoint}", "timestamp": "2025-01-28"}
# Primeira chamada: lenta (cache MISS)
resultado = buscar_dados_api("/usuarios")
# Segunda chamada: instantanea (cache HIT)
resultado = buscar_dados_api("/usuarios")
Cache em Aplicacoes Web
Integrando cache com FastAPI:
from fastapi import FastAPI, Depends
import redis
import json
app = FastAPI()
cache = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
async def get_cache():
return cache
@app.get("/produtos")
async def listar_produtos(r: redis.Redis = Depends(get_cache)):
"""Lista produtos com cache."""
chave = "produtos:lista"
# Tentar cache
dados_cache = r.get(chave)
if dados_cache:
return {"fonte": "cache", "dados": json.loads(dados_cache)}
# Buscar do banco de dados (simulado)
produtos = [
{"id": 1, "nome": "Notebook", "preco": 3500.00},
{"id": 2, "nome": "Mouse", "preco": 89.90},
{"id": 3, "nome": "Teclado", "preco": 199.90},
]
# Salvar no cache por 5 minutos
r.setex(chave, 300, json.dumps(produtos))
return {"fonte": "banco", "dados": produtos}
@app.post("/produtos/{produto_id}/atualizar")
async def atualizar_produto(produto_id: int, r: redis.Redis = Depends(get_cache)):
"""Atualiza produto e invalida cache."""
# ... atualizar no banco ...
r.delete("produtos:lista") # Invalidar cache
return {"status": "atualizado", "cache": "invalidado"}
Rate Limiting com Redis
Redis e perfeito para controlar taxa de requisicoes:
import redis
import time
class RateLimiter:
"""Rate limiter usando Redis com sliding window."""
def __init__(self, redis_client, limite, janela_segundos):
self.r = redis_client
self.limite = limite
self.janela = janela_segundos
def permitir(self, identificador: str) -> bool:
"""Verifica se a requisicao e permitida."""
chave = f"rate_limit:{identificador}"
agora = time.time()
pipe = self.r.pipeline()
# Remover registros fora da janela
pipe.zremrangebyscore(chave, 0, agora - self.janela)
# Contar registros na janela
pipe.zcard(chave)
# Adicionar registro atual
pipe.zadd(chave, {str(agora): agora})
# Definir expiracao
pipe.expire(chave, self.janela)
resultados = pipe.execute()
contagem = resultados[1]
return contagem < self.limite
def info(self, identificador: str) -> dict:
"""Retorna informacoes sobre o rate limit."""
chave = f"rate_limit:{identificador}"
agora = time.time()
self.r.zremrangebyscore(chave, 0, agora - self.janela)
contagem = self.r.zcard(chave)
return {
"limite": self.limite,
"usado": contagem,
"restante": max(0, self.limite - contagem),
"janela_segundos": self.janela
}
# Uso
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
limiter = RateLimiter(r, limite=10, janela_segundos=60)
# Simular requisicoes
for i in range(12):
ip = "192.168.1.100"
if limiter.permitir(ip):
print(f"Requisicao {i+1}: permitida")
else:
print(f"Requisicao {i+1}: bloqueada (rate limit)")
Filas de Tarefas
Redis funciona como fila de mensagens para processamento assincrono:
import redis
import json
import time
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
class FilaTarefas:
"""Fila de tarefas simples usando Redis."""
def __init__(self, redis_client, nome_fila="tarefas"):
self.r = redis_client
self.fila = nome_fila
self.processando = f"{nome_fila}:processando"
def adicionar(self, tipo: str, dados: dict):
"""Adiciona tarefa a fila."""
tarefa = {
"tipo": tipo,
"dados": dados,
"criado_em": datetime.now().isoformat()
}
self.r.rpush(self.fila, json.dumps(tarefa))
print(f"Tarefa adicionada: {tipo}")
def processar(self, timeout=5):
"""Processa proxima tarefa da fila (bloqueante)."""
resultado = self.r.blpop(self.fila, timeout=timeout)
if resultado:
_, tarefa_json = resultado
return json.loads(tarefa_json)
return None
def tamanho(self):
return self.r.llen(self.fila)
# Produtor
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
fila = FilaTarefas(r)
fila.adicionar("enviar_email", {
"para": "ana@email.com",
"assunto": "Bem-vinda!",
"corpo": "Obrigado por se cadastrar."
})
fila.adicionar("processar_imagem", {
"caminho": "/uploads/foto.jpg",
"operacao": "redimensionar",
"largura": 800
})
# Consumidor (worker)
def worker(fila):
"""Processa tarefas continuamente."""
print("Worker iniciado. Aguardando tarefas...")
while True:
tarefa = fila.processar(timeout=5)
if tarefa:
print(f"Processando: {tarefa['tipo']}")
# Executar tarefa baseado no tipo
if tarefa["tipo"] == "enviar_email":
print(f" Enviando email para {tarefa['dados']['para']}")
elif tarefa["tipo"] == "processar_imagem":
print(f" Processando {tarefa['dados']['caminho']}")
time.sleep(1) # Simula processamento
# worker(fila) # Executar em outro terminal
Pub/Sub para Eventos
Redis Pub/Sub permite comunicacao em tempo real entre servicos:
import redis
import json
import threading
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
def publicar_evento(canal, evento):
"""Publica evento em um canal."""
r.publish(canal, json.dumps(evento))
def ouvir_eventos(canais):
"""Escuta eventos de canais especificos."""
pubsub = r.pubsub()
pubsub.subscribe(canais)
print(f"Ouvindo canais: {canais}")
for mensagem in pubsub.listen():
if mensagem["type"] == "message":
dados = json.loads(mensagem["data"])
print(f"[{mensagem['channel']}] {dados}")
# Em outro processo ou thread
# ouvir_eventos(["pedidos", "notificacoes"])
# Publicar eventos
publicar_evento("pedidos", {
"tipo": "novo_pedido",
"pedido_id": 123,
"valor": 299.90
})
publicar_evento("notificacoes", {
"tipo": "alerta",
"mensagem": "Estoque baixo do produto X"
})
Sessoes de Usuario com Redis
import redis
import json
import uuid
from datetime import datetime
class GerenciadorSessoes:
"""Gerencia sessoes de usuario com Redis."""
def __init__(self, redis_client, ttl=3600):
self.r = redis_client
self.ttl = ttl
def criar(self, usuario_id: str, dados: dict) -> str:
"""Cria nova sessao."""
session_id = str(uuid.uuid4())
chave = f"sessao:{session_id}"
sessao = {
"usuario_id": usuario_id,
"criada_em": datetime.now().isoformat(),
**dados
}
self.r.setex(chave, self.ttl, json.dumps(sessao))
return session_id
def obter(self, session_id: str) -> dict:
"""Obtem dados da sessao."""
chave = f"sessao:{session_id}"
dados = self.r.get(chave)
if dados:
self.r.expire(chave, self.ttl) # Renovar TTL
return json.loads(dados)
return None
def destruir(self, session_id: str):
"""Destroi uma sessao."""
self.r.delete(f"sessao:{session_id}")
# Uso
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
sessoes = GerenciadorSessoes(r, ttl=1800)
sid = sessoes.criar("usuario_123", {"nome": "Ana", "role": "admin"})
print(f"Sessao criada: {sid}")
dados = sessoes.obter(sid)
print(f"Dados: {dados}")
Boas Praticas
Ao usar Redis com Python, siga estas recomendacoes:
- Use
decode_responses=Truepara trabalhar com strings ao inves de bytes - Defina TTL em todas as chaves para evitar acumulo de dados
- Use pipelines para operacoes em lote, reduzindo round-trips
- Estruture as chaves com prefixos claros como
cache:,sessao:,fila: - Implemente connection pooling para aplicacoes com muitas conexoes
- Monitore uso de memoria do Redis regularmente
- Use Lua scripts para operacoes atomicas complexas
- Configure persistencia (RDB ou AOF) conforme a criticidade dos dados
Conclusao
Redis e uma ferramenta versatil que resolve problemas comuns de performance e arquitetura em aplicacoes Python. Do cache simples de funcoes ate filas de tarefas e pub/sub para eventos, as possibilidades sao amplas. Comece implementando cache nas consultas mais lentas da sua aplicacao e explore funcionalidades mais avancadas conforme a necessidade. O ganho de performance e imediato e o impacto positivo na experiencia do usuario e significativo.
Equipe Python Brasil
Contribuidor do Python Brasil — Aprenda Python em Português