Celery: O que É e Como Funciona | Python Brasil
Guia completo sobre Celery em Python: filas de tarefas, workers, agendamento, monitoramento e boas praticas para processamento assincrono distribuido.
O que e Celery?
Celery e uma fila de tarefas distribuida (distributed task queue) para Python. Ela permite executar operacoes demoradas em segundo plano, fora do fluxo principal da aplicacao. Em vez de fazer o usuario esperar pelo envio de um email, processamento de uma imagem ou geracao de um relatorio, voce envia a tarefa para o Celery, que a executa de forma assincrona em um processo separado (worker).
O Celery usa um broker de mensagens (como Redis ou RabbitMQ) para intermediar a comunicacao entre a aplicacao que envia tarefas e os workers que as executam. Opcionalmente, um backend de resultado armazena os retornos das tarefas.
Instalacao e Configuracao
# Instalacao
# pip install celery[redis]
# arquivo: celery_app.py
from celery import Celery
app = Celery(
'meu_projeto',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
)
# Configuracoes opcionais
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='America/Sao_Paulo',
enable_utc=True,
task_track_started=True,
task_time_limit=300, # 5 minutos maximo por tarefa
task_soft_time_limit=240, # aviso 1 minuto antes
worker_max_tasks_per_child=1000, # reinicia worker apos 1000 tarefas
)
Definindo Tarefas
# arquivo: tasks.py
from celery_app import app
import time
@app.task
def enviar_email(destinatario: str, assunto: str, corpo: str):
"""Envia um email de forma assincrona."""
# Simula envio demorado
time.sleep(5)
print(f'Email enviado para {destinatario}: {assunto}')
return {'status': 'enviado', 'destinatario': destinatario}
@app.task(bind=True, max_retries=3)
def processar_imagem(self, imagem_id: int):
"""Processa uma imagem com retry automatico."""
try:
# logica de processamento
print(f'Processando imagem {imagem_id}')
return {'imagem_id': imagem_id, 'status': 'processada'}
except Exception as exc:
# Retry com backoff exponencial
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
@app.task(bind=True)
def gerar_relatorio(self, usuario_id: int, tipo: str):
"""Gera relatorio com atualizacao de progresso."""
etapas = ['coletando dados', 'processando', 'formatando', 'finalizando']
for i, etapa in enumerate(etapas):
self.update_state(
state='PROGRESSO',
meta={'etapa': etapa, 'progresso': (i + 1) / len(etapas) * 100}
)
time.sleep(2)
return {'tipo': tipo, 'url': f'/relatorios/{usuario_id}/{tipo}.pdf'}
Chamando Tarefas
from tasks import enviar_email, processar_imagem, gerar_relatorio
# Chamar de forma assincrona (envia para o broker)
resultado = enviar_email.delay('ana@email.com', 'Bem-vinda', 'Ola Ana!')
print(resultado.id) # ID da tarefa
print(resultado.status) # 'PENDING'
# Alternativa com mais controle
resultado = enviar_email.apply_async(
args=['ana@email.com', 'Bem-vinda', 'Ola Ana!'],
countdown=60, # executar daqui a 60 segundos
expires=3600, # expira em 1 hora se nao executada
queue='emails', # fila especifica
)
# Verificar resultado
if resultado.ready():
print(resultado.get(timeout=10)) # aguarda ate 10 segundos
# Verificar progresso
resultado = gerar_relatorio.delay(1, 'vendas')
info = resultado.info
if resultado.state == 'PROGRESSO':
print(f"Etapa: {info['etapa']}, Progresso: {info['progresso']}%")
Agrupamento de Tarefas
from celery import group, chain, chord
# Group — executar tarefas em paralelo
job = group([
processar_imagem.s(1),
processar_imagem.s(2),
processar_imagem.s(3),
])
resultado = job.apply_async()
resultados = resultado.get() # lista com todos os resultados
# Chain — executar tarefas em sequencia (resultado passa para a proxima)
pipeline = chain(
processar_imagem.s(1),
enviar_email.s('admin@email.com', 'Imagem processada', ''),
)
pipeline.apply_async()
# Chord — group seguido de callback
callback = enviar_email.si('admin@email.com', 'Todas processadas', 'Pronto!')
job = chord([processar_imagem.s(i) for i in range(10)])(callback)
Agendamento com Celery Beat
from celery.schedules import crontab
app.conf.beat_schedule = {
'limpar-dados-expirados': {
'task': 'tasks.limpar_dados_expirados',
'schedule': crontab(hour=3, minute=0), # todo dia as 3h
},
'enviar-resumo-diario': {
'task': 'tasks.enviar_resumo',
'schedule': crontab(hour=8, minute=0, day_of_week='1-5'), # seg-sex as 8h
},
'verificar-saude': {
'task': 'tasks.health_check',
'schedule': 300.0, # a cada 5 minutos
},
}
# Executar beat: celery -A celery_app beat
# Executar worker: celery -A celery_app worker --loglevel=info
Integracao com Django e FastAPI
# Com FastAPI
from fastapi import FastAPI
from tasks import processar_imagem
api = FastAPI()
@api.post('/processar/{imagem_id}')
async def iniciar_processamento(imagem_id: int):
tarefa = processar_imagem.delay(imagem_id)
return {'tarefa_id': tarefa.id}
@api.get('/status/{tarefa_id}')
async def verificar_status(tarefa_id: str):
from celery.result import AsyncResult
resultado = AsyncResult(tarefa_id)
return {
'status': resultado.status,
'resultado': resultado.result if resultado.ready() else None,
}
Erros Comuns
O erro mais frequente e importar a instancia do Celery incorretamente, causando tarefas que nunca sao registradas. Outro problema comum e nao configurar serializacao JSON e usar pickle, que e inseguro. Esquecer de tratar falhas com retry tambem e frequente — tarefas que dependem de servicos externos devem sempre ter mecanismo de retry com backoff exponencial. Nao monitorar filas pode levar a acumulo de tarefas nao processadas sem que ninguem perceba.
Boas Praticas
Configure limites de tempo com task_time_limit e task_soft_time_limit. Use retry com backoff exponencial para tarefas que dependem de servicos externos. Monitore com Flower (celery -A app flower). Separe tarefas em filas diferentes por prioridade. Use serializacao JSON em vez de pickle. Mantenha tarefas idempotentes — executar a mesma tarefa duas vezes deve produzir o mesmo resultado. Nao passe objetos complexos como argumentos; envie apenas IDs e busque os dados dentro da tarefa.
Quando Usar
Celery e ideal para operacoes demoradas que nao devem bloquear a resposta ao usuario: envio de emails, processamento de imagens, geracao de relatorios, sincronizacao com APIs externas e tarefas agendadas. Para projetos simples ou tarefas que levam menos de um segundo, Celery pode ser excessivo.