Voltar ao Glossario
Glossario Python

Async/Await: O que É e Como Funciona | Python Brasil

Guia completo de async/await em Python: event loop, asyncio.gather, semaphores, async generators, aiofiles, asyncpg e debugging assíncrono.

O que é Async/Await?

Async/await é a sintaxe do Python para programação assíncrona. Ela permite que seu programa execute múltiplas tarefas de forma concorrente, sem bloquear a execução enquanto espera por operações lentas como requisições HTTP, leitura de arquivos ou consultas a banco de dados.

Introduzido no Python 3.5, o async/await tornou o código assíncrono tão legível quanto código síncrono. Antes disso, a programação assíncrona em Python requeria callbacks e a API de baixo nível do asyncio, muito mais difíceis de entender e depurar.

O Event Loop em Detalhe

O event loop (laço de eventos) é o coração do modelo assíncrono do Python. Ele é responsável por:

  1. Receber tarefas (coroutines) para executar
  2. Executar cada tarefa até que ela encontre um await
  3. Enquanto a tarefa aguarda (ex: resposta de rede), executar outra tarefa
  4. Quando o resultado chega, retomar a tarefa original

Visualmente:

Event Loop:
  [Tarefa A rodando] --> await (I/O) --> [pausa]
  [Tarefa B rodando] --> await (I/O) --> [pausa]
  [Tarefa C rodando] --> resultado pronto --> [completa]
  [Tarefa A retoma] --> resultado pronto --> [completa]
  [Tarefa B retoma] --> resultado pronto --> [completa]

Tudo acontece em uma única thread. Não há paralelismo real (para isso existe multiprocessing), mas há concorrência: múltiplas tarefas progridem enquanto outras aguardam I/O.

import asyncio

async def tarefa(nome: str, segundos: float) -> str:
    print(f"[{nome}] Iniciando...")
    await asyncio.sleep(segundos)  # Simula I/O; libera o event loop
    print(f"[{nome}] Concluído após {segundos}s")
    return f"Resultado de {nome}"

async def main():
    # Sequencial: 6 segundos no total
    # r1 = await tarefa("A", 2)
    # r2 = await tarefa("B", 2)
    # r3 = await tarefa("C", 2)

    # Concorrente: 2 segundos no total!
    resultados = await asyncio.gather(
        tarefa("A", 2),
        tarefa("B", 2),
        tarefa("C", 2),
    )
    print(resultados)

asyncio.run(main())

asyncio.create_task vs asyncio.gather

asyncio.create_task

create_task agenda uma coroutine para executar em segundo plano imediatamente, sem precisar aguardá-la agora:

import asyncio

async def processar_dados(item: int) -> int:
    await asyncio.sleep(1)
    return item * 2

async def main():
    # Cria tasks que iniciam imediatamente
    task1 = asyncio.create_task(processar_dados(1))
    task2 = asyncio.create_task(processar_dados(2))
    task3 = asyncio.create_task(processar_dados(3))

    # Faz outras coisas enquanto as tasks rodam
    print("Fazendo outras coisas...")
    await asyncio.sleep(0.5)  # Cede o controle para as tasks

    # Aguarda os resultados quando necessário
    r1 = await task1
    r2 = await task2
    r3 = await task3
    print(f"Resultados: {r1}, {r2}, {r3}")

asyncio.run(main())

asyncio.gather

gather executa múltiplas coroutines concorrentemente e aguarda todas terminarem:

import asyncio

async def buscar_api(url: str) -> dict:
    await asyncio.sleep(1)  # Simula requisição HTTP
    return {"url": url, "status": 200}

async def main():
    urls = [
        "https://api.exemplo.com/usuarios",
        "https://api.exemplo.com/produtos",
        "https://api.exemplo.com/pedidos",
    ]

    # return_exceptions=True evita que uma falha cancele todas as outras
    resultados = await asyncio.gather(
        *[buscar_api(url) for url in urls],
        return_exceptions=True
    )

    for url, resultado in zip(urls, resultados):
        if isinstance(resultado, Exception):
            print(f"{url}: ERRO - {resultado}")
        else:
            print(f"{url}: {resultado}")

asyncio.run(main())

Semáforos para Limitar Concorrência

Executar centenas de tarefas ao mesmo tempo pode sobrecarregar o servidor remoto. Use asyncio.Semaphore para limitar a concorrência:

import asyncio
import aiohttp

async def buscar_url(session: aiohttp.ClientSession, url: str, semaforo: asyncio.Semaphore) -> str:
    async with semaforo:  # No máximo N tarefas simultâneas
        print(f"Buscando {url}...")
        async with session.get(url) as response:
            conteudo = await response.text()
            print(f"Concluído: {url}")
            return conteudo

async def main():
    urls = [f"https://httpbin.org/delay/1?id={i}" for i in range(20)]
    semaforo = asyncio.Semaphore(5)  # Máximo 5 requisições simultâneas

    async with aiohttp.ClientSession() as session:
        tarefas = [buscar_url(session, url, semaforo) for url in urls]
        resultados = await asyncio.gather(*tarefas, return_exceptions=True)

    print(f"Concluídas: {len(resultados)} URLs")

asyncio.run(main())

Async Context Managers

Você pode criar seus próprios context managers assíncronos com __aenter__ e __aexit__:

import asyncio
from typing import AsyncGenerator

class ConexaoBancoDados:
    def __init__(self, url: str):
        self.url = url
        self.conexao = None

    async def __aenter__(self):
        print(f"Conectando ao banco: {self.url}")
        await asyncio.sleep(0.1)  # Simula conexão
        self.conexao = {"url": self.url, "ativa": True}
        return self.conexao

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Fechando conexão com o banco")
        if self.conexao:
            self.conexao["ativa"] = False
        # Retornar False não suprime exceções

async def main():
    async with ConexaoBancoDados("postgresql://localhost/mydb") as conn:
        print(f"Conexão ativa: {conn['ativa']}")
        # A conexão é fechada automaticamente ao sair do bloco

asyncio.run(main())

Async Generators

Generators assíncronos permitem produzir valores de forma incremental e assíncrona, ideais para streaming de dados:

import asyncio
from typing import AsyncGenerator

async def gerar_numeros(inicio: int, fim: int, delay: float = 0.5) -> AsyncGenerator[int, None]:
    for numero in range(inicio, fim + 1):
        await asyncio.sleep(delay)  # Simula busca assíncrona
        yield numero

async def processar_stream():
    async for numero in gerar_numeros(1, 5):
        print(f"Processando número: {numero}")

# Coletar todos os valores
async def coletar_todos():
    numeros = [n async for n in gerar_numeros(1, 10, delay=0.1)]
    print(f"Total coletado: {numeros}")

asyncio.run(processar_stream())

aiofiles: Leitura de Arquivos Assíncrona

Operações de arquivo bloqueiam o event loop por padrão. Use aiofiles para I/O de arquivo não bloqueante:

import asyncio
import aiofiles

async def ler_arquivo(caminho: str) -> str:
    async with aiofiles.open(caminho, mode="r", encoding="utf-8") as arquivo:
        conteudo = await arquivo.read()
    return conteudo

async def escrever_log(caminho: str, mensagem: str) -> None:
    async with aiofiles.open(caminho, mode="a", encoding="utf-8") as arquivo:
        await arquivo.write(f"{mensagem}\n")

async def processar_multiplos_arquivos(caminhos: list[str]) -> list[str]:
    tarefas = [ler_arquivo(caminho) for caminho in caminhos]
    conteudos = await asyncio.gather(*tarefas)
    return list(conteudos)

async def main():
    await escrever_log("app.log", "Aplicação iniciada")
    conteudo = await ler_arquivo("config.json")
    print(f"Config carregada: {len(conteudo)} bytes")

asyncio.run(main())

Drivers de Banco de Dados Assíncronos

asyncpg (PostgreSQL)

import asyncio
import asyncpg

async def buscar_usuarios(pool: asyncpg.Pool) -> list[dict]:
    async with pool.acquire() as conn:
        rows = await conn.fetch(
            "SELECT id, nome, email FROM usuarios WHERE ativo = $1",
            True
        )
        return [dict(row) for row in rows]

async def criar_usuario(pool: asyncpg.Pool, nome: str, email: str) -> int:
    async with pool.acquire() as conn:
        usuario_id = await conn.fetchval(
            "INSERT INTO usuarios (nome, email) VALUES ($1, $2) RETURNING id",
            nome, email
        )
        return usuario_id

async def main():
    pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost/mydb",
        min_size=5,
        max_size=20
    )

    try:
        usuarios = await buscar_usuarios(pool)
        print(f"Encontrados: {len(usuarios)} usuários")

        novo_id = await criar_usuario(pool, "Carlos", "carlos@email.com")
        print(f"Usuário criado com ID: {novo_id}")
    finally:
        await pool.close()

asyncio.run(main())

databases (abstração assíncrona)

import asyncio
from databases import Database

DATABASE_URL = "postgresql://user:pass@localhost/mydb"
database = Database(DATABASE_URL)

async def main():
    await database.connect()

    # SELECT
    query = "SELECT * FROM usuarios WHERE ativo = :ativo"
    rows = await database.fetch_all(query=query, values={"ativo": True})

    # INSERT
    query = "INSERT INTO usuarios (nome, email) VALUES (:nome, :email)"
    await database.execute(query=query, values={"nome": "Ana", "email": "ana@email.com"})

    await database.disconnect()

asyncio.run(main())

Depurando Código Assíncrono

Ativar modo debug do asyncio

import asyncio
import logging

# Exibe avisos sobre coroutines nunca aguardadas e tasks lentas
asyncio.run(main(), debug=True)

# Ou via variável de ambiente:
# PYTHONASYNCIODEBUG=1 python meu_script.py

Detectar tasks pendentes

async def verificar_tasks_pendentes():
    tasks = asyncio.all_tasks()
    pendentes = [t for t in tasks if not t.done()]
    print(f"Tasks pendentes: {len(pendentes)}")
    for task in pendentes:
        print(f"  - {task.get_name()}: {task.get_coro()}")

Timeout em operações assíncronas

import asyncio

async def operacao_lenta():
    await asyncio.sleep(10)
    return "Resultado"

async def main():
    try:
        resultado = await asyncio.wait_for(
            operacao_lenta(),
            timeout=5.0  # Cancela se demorar mais de 5 segundos
        )
    except asyncio.TimeoutError:
        print("Operação cancelada por timeout!")

Armadilhas Comuns

Bloquear o Event Loop

O maior erro em código assíncrono é chamar funções bloqueantes dentro de coroutines:

import asyncio
import time
import requests  # Biblioteca síncrona!

# ERRADO: bloqueia o event loop inteiro
async def buscar_errado(url: str):
    resposta = requests.get(url)  # Bloqueia todas as outras tasks!
    return resposta.text

# CORRETO: usa biblioteca assíncrona
import aiohttp

async def buscar_certo(url: str):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resposta:
            return await resposta.text()

# Para código bloqueante inevitável (ex: biblioteca legada):
async def usar_codigo_bloqueante():
    loop = asyncio.get_event_loop()
    resultado = await loop.run_in_executor(None, requests.get, "https://exemplo.com")
    return resultado

Coroutine Nunca Aguardada

async def enviar_email(destinatario: str):
    await asyncio.sleep(1)
    print(f"E-mail enviado para {destinatario}")

async def main():
    # ERRADO: cria a coroutine mas não a executa!
    enviar_email("usuario@email.com")  # RuntimeWarning: coroutine nunca aguardada

    # CORRETO:
    await enviar_email("usuario@email.com")

    # Ou se quiser em background:
    asyncio.create_task(enviar_email("usuario@email.com"))

Quando Usar Async/Await

Use programação assíncrona quando seu programa é I/O-bound — passa muito tempo esperando respostas de rede, disco ou banco de dados.

Use async/await para:

  • APIs com muitas requisições simultâneas (FastAPI, aiohttp)
  • WebSockets e streaming de dados
  • Scrapers que fazem centenas de requisições HTTP
  • Processamento de múltiplos arquivos em paralelo

Prefira multiprocessing para:

  • Processamento de imagens ou vídeo
  • Cálculos matemáticos pesados
  • Machine learning e treinamento de modelos
  • Qualquer operação CPU-bound

Boas Práticas

  • Sempre use asyncio.run() no ponto de entrada — nunca crie loops manualmente
  • Use asyncio.gather() para executar múltiplas coroutines concorrentemente
  • Use asyncio.Semaphore para evitar sobrecarregar recursos externos
  • Nunca chame funções bloqueantes diretamente em coroutines; use run_in_executor
  • Use asyncio.wait_for() para definir timeouts em operações potencialmente lentas
  • Teste com o modo debug ativado (PYTHONASYNCIODEBUG=1) durante o desenvolvimento

Termos Relacionados

  • FastAPI - Framework web assíncrono que usa async/await nativamente
  • WebSocket - Comunicação em tempo real, sempre assíncrona
  • Generators - Conceito base das coroutines em Python
  • Decorators - Usados com funções assíncronas