Async/Await: O que É e Como Funciona | Python Brasil
Guia completo de async/await em Python: event loop, asyncio.gather, semaphores, async generators, aiofiles, asyncpg e debugging assíncrono.
O que é Async/Await?
Async/await é a sintaxe do Python para programação assíncrona. Ela permite que seu programa execute múltiplas tarefas de forma concorrente, sem bloquear a execução enquanto espera por operações lentas como requisições HTTP, leitura de arquivos ou consultas a banco de dados.
Introduzido no Python 3.5, o async/await tornou o código assíncrono tão legível quanto código síncrono. Antes disso, a programação assíncrona em Python requeria callbacks e a API de baixo nível do asyncio, muito mais difíceis de entender e depurar.
O Event Loop em Detalhe
O event loop (laço de eventos) é o coração do modelo assíncrono do Python. Ele é responsável por:
- Receber tarefas (coroutines) para executar
- Executar cada tarefa até que ela encontre um
await - Enquanto a tarefa aguarda (ex: resposta de rede), executar outra tarefa
- Quando o resultado chega, retomar a tarefa original
Visualmente:
Event Loop:
[Tarefa A rodando] --> await (I/O) --> [pausa]
[Tarefa B rodando] --> await (I/O) --> [pausa]
[Tarefa C rodando] --> resultado pronto --> [completa]
[Tarefa A retoma] --> resultado pronto --> [completa]
[Tarefa B retoma] --> resultado pronto --> [completa]
Tudo acontece em uma única thread. Não há paralelismo real (para isso existe multiprocessing), mas há concorrência: múltiplas tarefas progridem enquanto outras aguardam I/O.
import asyncio
async def tarefa(nome: str, segundos: float) -> str:
print(f"[{nome}] Iniciando...")
await asyncio.sleep(segundos) # Simula I/O; libera o event loop
print(f"[{nome}] Concluído após {segundos}s")
return f"Resultado de {nome}"
async def main():
# Sequencial: 6 segundos no total
# r1 = await tarefa("A", 2)
# r2 = await tarefa("B", 2)
# r3 = await tarefa("C", 2)
# Concorrente: 2 segundos no total!
resultados = await asyncio.gather(
tarefa("A", 2),
tarefa("B", 2),
tarefa("C", 2),
)
print(resultados)
asyncio.run(main())
asyncio.create_task vs asyncio.gather
asyncio.create_task
create_task agenda uma coroutine para executar em segundo plano imediatamente, sem precisar aguardá-la agora:
import asyncio
async def processar_dados(item: int) -> int:
await asyncio.sleep(1)
return item * 2
async def main():
# Cria tasks que iniciam imediatamente
task1 = asyncio.create_task(processar_dados(1))
task2 = asyncio.create_task(processar_dados(2))
task3 = asyncio.create_task(processar_dados(3))
# Faz outras coisas enquanto as tasks rodam
print("Fazendo outras coisas...")
await asyncio.sleep(0.5) # Cede o controle para as tasks
# Aguarda os resultados quando necessário
r1 = await task1
r2 = await task2
r3 = await task3
print(f"Resultados: {r1}, {r2}, {r3}")
asyncio.run(main())
asyncio.gather
gather executa múltiplas coroutines concorrentemente e aguarda todas terminarem:
import asyncio
async def buscar_api(url: str) -> dict:
await asyncio.sleep(1) # Simula requisição HTTP
return {"url": url, "status": 200}
async def main():
urls = [
"https://api.exemplo.com/usuarios",
"https://api.exemplo.com/produtos",
"https://api.exemplo.com/pedidos",
]
# return_exceptions=True evita que uma falha cancele todas as outras
resultados = await asyncio.gather(
*[buscar_api(url) for url in urls],
return_exceptions=True
)
for url, resultado in zip(urls, resultados):
if isinstance(resultado, Exception):
print(f"{url}: ERRO - {resultado}")
else:
print(f"{url}: {resultado}")
asyncio.run(main())
Semáforos para Limitar Concorrência
Executar centenas de tarefas ao mesmo tempo pode sobrecarregar o servidor remoto. Use asyncio.Semaphore para limitar a concorrência:
import asyncio
import aiohttp
async def buscar_url(session: aiohttp.ClientSession, url: str, semaforo: asyncio.Semaphore) -> str:
async with semaforo: # No máximo N tarefas simultâneas
print(f"Buscando {url}...")
async with session.get(url) as response:
conteudo = await response.text()
print(f"Concluído: {url}")
return conteudo
async def main():
urls = [f"https://httpbin.org/delay/1?id={i}" for i in range(20)]
semaforo = asyncio.Semaphore(5) # Máximo 5 requisições simultâneas
async with aiohttp.ClientSession() as session:
tarefas = [buscar_url(session, url, semaforo) for url in urls]
resultados = await asyncio.gather(*tarefas, return_exceptions=True)
print(f"Concluídas: {len(resultados)} URLs")
asyncio.run(main())
Async Context Managers
Você pode criar seus próprios context managers assíncronos com __aenter__ e __aexit__:
import asyncio
from typing import AsyncGenerator
class ConexaoBancoDados:
def __init__(self, url: str):
self.url = url
self.conexao = None
async def __aenter__(self):
print(f"Conectando ao banco: {self.url}")
await asyncio.sleep(0.1) # Simula conexão
self.conexao = {"url": self.url, "ativa": True}
return self.conexao
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Fechando conexão com o banco")
if self.conexao:
self.conexao["ativa"] = False
# Retornar False não suprime exceções
async def main():
async with ConexaoBancoDados("postgresql://localhost/mydb") as conn:
print(f"Conexão ativa: {conn['ativa']}")
# A conexão é fechada automaticamente ao sair do bloco
asyncio.run(main())
Async Generators
Generators assíncronos permitem produzir valores de forma incremental e assíncrona, ideais para streaming de dados:
import asyncio
from typing import AsyncGenerator
async def gerar_numeros(inicio: int, fim: int, delay: float = 0.5) -> AsyncGenerator[int, None]:
for numero in range(inicio, fim + 1):
await asyncio.sleep(delay) # Simula busca assíncrona
yield numero
async def processar_stream():
async for numero in gerar_numeros(1, 5):
print(f"Processando número: {numero}")
# Coletar todos os valores
async def coletar_todos():
numeros = [n async for n in gerar_numeros(1, 10, delay=0.1)]
print(f"Total coletado: {numeros}")
asyncio.run(processar_stream())
aiofiles: Leitura de Arquivos Assíncrona
Operações de arquivo bloqueiam o event loop por padrão. Use aiofiles para I/O de arquivo não bloqueante:
import asyncio
import aiofiles
async def ler_arquivo(caminho: str) -> str:
async with aiofiles.open(caminho, mode="r", encoding="utf-8") as arquivo:
conteudo = await arquivo.read()
return conteudo
async def escrever_log(caminho: str, mensagem: str) -> None:
async with aiofiles.open(caminho, mode="a", encoding="utf-8") as arquivo:
await arquivo.write(f"{mensagem}\n")
async def processar_multiplos_arquivos(caminhos: list[str]) -> list[str]:
tarefas = [ler_arquivo(caminho) for caminho in caminhos]
conteudos = await asyncio.gather(*tarefas)
return list(conteudos)
async def main():
await escrever_log("app.log", "Aplicação iniciada")
conteudo = await ler_arquivo("config.json")
print(f"Config carregada: {len(conteudo)} bytes")
asyncio.run(main())
Drivers de Banco de Dados Assíncronos
asyncpg (PostgreSQL)
import asyncio
import asyncpg
async def buscar_usuarios(pool: asyncpg.Pool) -> list[dict]:
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id, nome, email FROM usuarios WHERE ativo = $1",
True
)
return [dict(row) for row in rows]
async def criar_usuario(pool: asyncpg.Pool, nome: str, email: str) -> int:
async with pool.acquire() as conn:
usuario_id = await conn.fetchval(
"INSERT INTO usuarios (nome, email) VALUES ($1, $2) RETURNING id",
nome, email
)
return usuario_id
async def main():
pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/mydb",
min_size=5,
max_size=20
)
try:
usuarios = await buscar_usuarios(pool)
print(f"Encontrados: {len(usuarios)} usuários")
novo_id = await criar_usuario(pool, "Carlos", "carlos@email.com")
print(f"Usuário criado com ID: {novo_id}")
finally:
await pool.close()
asyncio.run(main())
databases (abstração assíncrona)
import asyncio
from databases import Database
DATABASE_URL = "postgresql://user:pass@localhost/mydb"
database = Database(DATABASE_URL)
async def main():
await database.connect()
# SELECT
query = "SELECT * FROM usuarios WHERE ativo = :ativo"
rows = await database.fetch_all(query=query, values={"ativo": True})
# INSERT
query = "INSERT INTO usuarios (nome, email) VALUES (:nome, :email)"
await database.execute(query=query, values={"nome": "Ana", "email": "ana@email.com"})
await database.disconnect()
asyncio.run(main())
Depurando Código Assíncrono
Ativar modo debug do asyncio
import asyncio
import logging
# Exibe avisos sobre coroutines nunca aguardadas e tasks lentas
asyncio.run(main(), debug=True)
# Ou via variável de ambiente:
# PYTHONASYNCIODEBUG=1 python meu_script.py
Detectar tasks pendentes
async def verificar_tasks_pendentes():
tasks = asyncio.all_tasks()
pendentes = [t for t in tasks if not t.done()]
print(f"Tasks pendentes: {len(pendentes)}")
for task in pendentes:
print(f" - {task.get_name()}: {task.get_coro()}")
Timeout em operações assíncronas
import asyncio
async def operacao_lenta():
await asyncio.sleep(10)
return "Resultado"
async def main():
try:
resultado = await asyncio.wait_for(
operacao_lenta(),
timeout=5.0 # Cancela se demorar mais de 5 segundos
)
except asyncio.TimeoutError:
print("Operação cancelada por timeout!")
Armadilhas Comuns
Bloquear o Event Loop
O maior erro em código assíncrono é chamar funções bloqueantes dentro de coroutines:
import asyncio
import time
import requests # Biblioteca síncrona!
# ERRADO: bloqueia o event loop inteiro
async def buscar_errado(url: str):
resposta = requests.get(url) # Bloqueia todas as outras tasks!
return resposta.text
# CORRETO: usa biblioteca assíncrona
import aiohttp
async def buscar_certo(url: str):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resposta:
return await resposta.text()
# Para código bloqueante inevitável (ex: biblioteca legada):
async def usar_codigo_bloqueante():
loop = asyncio.get_event_loop()
resultado = await loop.run_in_executor(None, requests.get, "https://exemplo.com")
return resultado
Coroutine Nunca Aguardada
async def enviar_email(destinatario: str):
await asyncio.sleep(1)
print(f"E-mail enviado para {destinatario}")
async def main():
# ERRADO: cria a coroutine mas não a executa!
enviar_email("usuario@email.com") # RuntimeWarning: coroutine nunca aguardada
# CORRETO:
await enviar_email("usuario@email.com")
# Ou se quiser em background:
asyncio.create_task(enviar_email("usuario@email.com"))
Quando Usar Async/Await
Use programação assíncrona quando seu programa é I/O-bound — passa muito tempo esperando respostas de rede, disco ou banco de dados.
Use async/await para:
- APIs com muitas requisições simultâneas (FastAPI, aiohttp)
- WebSockets e streaming de dados
- Scrapers que fazem centenas de requisições HTTP
- Processamento de múltiplos arquivos em paralelo
Prefira multiprocessing para:
- Processamento de imagens ou vídeo
- Cálculos matemáticos pesados
- Machine learning e treinamento de modelos
- Qualquer operação CPU-bound
Boas Práticas
- Sempre use
asyncio.run()no ponto de entrada — nunca crie loops manualmente - Use
asyncio.gather()para executar múltiplas coroutines concorrentemente - Use
asyncio.Semaphorepara evitar sobrecarregar recursos externos - Nunca chame funções bloqueantes diretamente em coroutines; use
run_in_executor - Use
asyncio.wait_for()para definir timeouts em operações potencialmente lentas - Teste com o modo debug ativado (
PYTHONASYNCIODEBUG=1) durante o desenvolvimento
Termos Relacionados
- FastAPI - Framework web assíncrono que usa async/await nativamente
- WebSocket - Comunicação em tempo real, sempre assíncrona
- Generators - Conceito base das coroutines em Python
- Decorators - Usados com funções assíncronas