Voltar ao Glossario
Glossario Python

Celery: O que É e Como Funciona | Python Brasil

Guia completo sobre Celery em Python: filas de tarefas, workers, agendamento, monitoramento e boas praticas para processamento assincrono distribuido.

O que e Celery?

Celery e uma fila de tarefas distribuida (distributed task queue) para Python. Ela permite executar operacoes demoradas em segundo plano, fora do fluxo principal da aplicacao. Em vez de fazer o usuario esperar pelo envio de um email, processamento de uma imagem ou geracao de um relatorio, voce envia a tarefa para o Celery, que a executa de forma assincrona em um processo separado (worker).

O Celery usa um broker de mensagens (como Redis ou RabbitMQ) para intermediar a comunicacao entre a aplicacao que envia tarefas e os workers que as executam. Opcionalmente, um backend de resultado armazena os retornos das tarefas.

Instalacao e Configuracao

# Instalacao
# pip install celery[redis]

# arquivo: celery_app.py
from celery import Celery

app = Celery(
    'meu_projeto',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1',
)

# Configuracoes opcionais
app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='America/Sao_Paulo',
    enable_utc=True,
    task_track_started=True,
    task_time_limit=300,          # 5 minutos maximo por tarefa
    task_soft_time_limit=240,     # aviso 1 minuto antes
    worker_max_tasks_per_child=1000,  # reinicia worker apos 1000 tarefas
)

Definindo Tarefas

# arquivo: tasks.py
from celery_app import app
import time

@app.task
def enviar_email(destinatario: str, assunto: str, corpo: str):
    """Envia um email de forma assincrona."""
    # Simula envio demorado
    time.sleep(5)
    print(f'Email enviado para {destinatario}: {assunto}')
    return {'status': 'enviado', 'destinatario': destinatario}

@app.task(bind=True, max_retries=3)
def processar_imagem(self, imagem_id: int):
    """Processa uma imagem com retry automatico."""
    try:
        # logica de processamento
        print(f'Processando imagem {imagem_id}')
        return {'imagem_id': imagem_id, 'status': 'processada'}
    except Exception as exc:
        # Retry com backoff exponencial
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)

@app.task(bind=True)
def gerar_relatorio(self, usuario_id: int, tipo: str):
    """Gera relatorio com atualizacao de progresso."""
    etapas = ['coletando dados', 'processando', 'formatando', 'finalizando']
    for i, etapa in enumerate(etapas):
        self.update_state(
            state='PROGRESSO',
            meta={'etapa': etapa, 'progresso': (i + 1) / len(etapas) * 100}
        )
        time.sleep(2)
    return {'tipo': tipo, 'url': f'/relatorios/{usuario_id}/{tipo}.pdf'}

Chamando Tarefas

from tasks import enviar_email, processar_imagem, gerar_relatorio

# Chamar de forma assincrona (envia para o broker)
resultado = enviar_email.delay('ana@email.com', 'Bem-vinda', 'Ola Ana!')
print(resultado.id)          # ID da tarefa
print(resultado.status)      # 'PENDING'

# Alternativa com mais controle
resultado = enviar_email.apply_async(
    args=['ana@email.com', 'Bem-vinda', 'Ola Ana!'],
    countdown=60,             # executar daqui a 60 segundos
    expires=3600,             # expira em 1 hora se nao executada
    queue='emails',           # fila especifica
)

# Verificar resultado
if resultado.ready():
    print(resultado.get(timeout=10))  # aguarda ate 10 segundos

# Verificar progresso
resultado = gerar_relatorio.delay(1, 'vendas')
info = resultado.info
if resultado.state == 'PROGRESSO':
    print(f"Etapa: {info['etapa']}, Progresso: {info['progresso']}%")

Agrupamento de Tarefas

from celery import group, chain, chord

# Group — executar tarefas em paralelo
job = group([
    processar_imagem.s(1),
    processar_imagem.s(2),
    processar_imagem.s(3),
])
resultado = job.apply_async()
resultados = resultado.get()  # lista com todos os resultados

# Chain — executar tarefas em sequencia (resultado passa para a proxima)
pipeline = chain(
    processar_imagem.s(1),
    enviar_email.s('admin@email.com', 'Imagem processada', ''),
)
pipeline.apply_async()

# Chord — group seguido de callback
callback = enviar_email.si('admin@email.com', 'Todas processadas', 'Pronto!')
job = chord([processar_imagem.s(i) for i in range(10)])(callback)

Agendamento com Celery Beat

from celery.schedules import crontab

app.conf.beat_schedule = {
    'limpar-dados-expirados': {
        'task': 'tasks.limpar_dados_expirados',
        'schedule': crontab(hour=3, minute=0),  # todo dia as 3h
    },
    'enviar-resumo-diario': {
        'task': 'tasks.enviar_resumo',
        'schedule': crontab(hour=8, minute=0, day_of_week='1-5'),  # seg-sex as 8h
    },
    'verificar-saude': {
        'task': 'tasks.health_check',
        'schedule': 300.0,  # a cada 5 minutos
    },
}

# Executar beat: celery -A celery_app beat
# Executar worker: celery -A celery_app worker --loglevel=info

Integracao com Django e FastAPI

# Com FastAPI
from fastapi import FastAPI
from tasks import processar_imagem

api = FastAPI()

@api.post('/processar/{imagem_id}')
async def iniciar_processamento(imagem_id: int):
    tarefa = processar_imagem.delay(imagem_id)
    return {'tarefa_id': tarefa.id}

@api.get('/status/{tarefa_id}')
async def verificar_status(tarefa_id: str):
    from celery.result import AsyncResult
    resultado = AsyncResult(tarefa_id)
    return {
        'status': resultado.status,
        'resultado': resultado.result if resultado.ready() else None,
    }

Erros Comuns

O erro mais frequente e importar a instancia do Celery incorretamente, causando tarefas que nunca sao registradas. Outro problema comum e nao configurar serializacao JSON e usar pickle, que e inseguro. Esquecer de tratar falhas com retry tambem e frequente — tarefas que dependem de servicos externos devem sempre ter mecanismo de retry com backoff exponencial. Nao monitorar filas pode levar a acumulo de tarefas nao processadas sem que ninguem perceba.

Boas Praticas

Configure limites de tempo com task_time_limit e task_soft_time_limit. Use retry com backoff exponencial para tarefas que dependem de servicos externos. Monitore com Flower (celery -A app flower). Separe tarefas em filas diferentes por prioridade. Use serializacao JSON em vez de pickle. Mantenha tarefas idempotentes — executar a mesma tarefa duas vezes deve produzir o mesmo resultado. Nao passe objetos complexos como argumentos; envie apenas IDs e busque os dados dentro da tarefa.

Quando Usar

Celery e ideal para operacoes demoradas que nao devem bloquear a resposta ao usuario: envio de emails, processamento de imagens, geracao de relatorios, sincronizacao com APIs externas e tarefas agendadas. Para projetos simples ou tarefas que levam menos de um segundo, Celery pode ser excessivo.