Python Async/Await: Guia Completo
Domine programação assíncrona em Python com asyncio e async/await. Aprenda com exemplos práticos usando aiohttp para requisições HTTP concorrentes.
Programação assíncrona é uma das habilidades mais valorizadas no mercado atual. Com async/await, Python permite executar múltiplas operações de I/O simultaneamente sem usar threads. Neste guia, a gente vai entender os conceitos e aplicar na prática.
O Problema: Código Síncrono Lento
Imagine que você precisa fazer 10 requisições HTTP. No modo síncrono, cada requisição espera a anterior terminar:
import requests
import time
def buscar_sincrono(urls):
"""Busca URLs uma por uma (lento)."""
resultados = []
for url in urls:
response = requests.get(url)
resultados.append(response.status_code)
print(f" {url} -> {response.status_code}")
return resultados
urls = [f"https://httpbin.org/delay/1" for _ in range(5)]
inicio = time.time()
buscar_sincrono(urls)
tempo = time.time() - inicio
print(f"\nSíncrono: {tempo:.2f} segundos (uma por vez)")
# Resultado: ~5 segundos (1 segundo por requisição)
A Solução: Async/Await
Com programação assíncrona, todas as requisições são feitas “ao mesmo tempo”:
import asyncio
import aiohttp
import time
async def buscar_url(session, url):
"""Busca uma URL de forma assíncrona."""
async with session.get(url) as response:
status = response.status
print(f" {url} -> {status}")
return status
async def buscar_assincrono(urls):
"""Busca todas as URLs concorrentemente (rápido)."""
async with aiohttp.ClientSession() as session:
tarefas = [buscar_url(session, url) for url in urls]
resultados = await asyncio.gather(*tarefas)
return resultados
urls = [f"https://httpbin.org/delay/1" for _ in range(5)]
inicio = time.time()
resultados = asyncio.run(buscar_assincrono(urls))
tempo = time.time() - inicio
print(f"\nAssíncrono: {tempo:.2f} segundos (todas ao mesmo tempo)")
# Resultado: ~1 segundo (todas em paralelo!)
Entendendo os Conceitos
Coroutines
Uma coroutine é uma função definida com async def. Ela pode ser pausada e retomada:
import asyncio
async def saudacao(nome, delay):
"""Uma coroutine simples."""
print(f"Olá, {nome}!")
await asyncio.sleep(delay) # Pausa sem bloquear
print(f"Tchau, {nome}! (após {delay}s)")
return f"Concluído: {nome}"
async def main():
# Executar coroutines sequencialmente
print("=== Sequencial ===")
resultado1 = await saudacao("Ana", 2)
resultado2 = await saudacao("Carlos", 1)
print(f"Resultados: {resultado1}, {resultado2}")
# Executar coroutines concorrentemente
print("\n=== Concorrente ===")
resultados = await asyncio.gather(
saudacao("Maria", 2),
saudacao("João", 1),
saudacao("Pedro", 3),
)
print(f"Resultados: {resultados}")
asyncio.run(main())
Tasks
Tasks são wrappers que permitem agendar coroutines para execução concorrente:
import asyncio
async def processar_item(item, delay):
print(f"Processando {item}...")
await asyncio.sleep(delay)
print(f" {item} concluído!")
return f"Resultado de {item}"
async def main():
# Criar tasks explicitamente
task1 = asyncio.create_task(processar_item("A", 2))
task2 = asyncio.create_task(processar_item("B", 1))
task3 = asyncio.create_task(processar_item("C", 3))
# As tasks já estão rodando! Agora vamos esperar
resultado1 = await task1
resultado2 = await task2
resultado3 = await task3
print(f"\nResultados: {resultado1}, {resultado2}, {resultado3}")
asyncio.run(main())
asyncio.gather vs asyncio.wait
import asyncio
async def tarefa(nome, delay, falhar=False):
await asyncio.sleep(delay)
if falhar:
raise ValueError(f"{nome} falhou!")
return f"{nome}: OK"
async def main():
# gather - espera todas e retorna resultados na mesma ordem
resultados = await asyncio.gather(
tarefa("A", 1),
tarefa("B", 2),
tarefa("C", 1),
)
print(f"gather: {resultados}")
# gather com return_exceptions - não propaga erros
resultados = await asyncio.gather(
tarefa("X", 1),
tarefa("Y", 1, falhar=True),
tarefa("Z", 1),
return_exceptions=True,
)
for r in resultados:
if isinstance(r, Exception):
print(f" Erro: {r}")
else:
print(f" Sucesso: {r}")
# wait - mais controle sobre concluídas/pendentes
tarefas = [
asyncio.create_task(tarefa("P", 1)),
asyncio.create_task(tarefa("Q", 3)),
asyncio.create_task(tarefa("R", 2)),
]
concluidas, pendentes = await asyncio.wait(
tarefas, timeout=2.0
)
print(f"\nConcluídas: {len(concluidas)}, Pendentes: {len(pendentes)}")
asyncio.run(main())
Exemplos Práticos
Web Scraper Assíncrono
import asyncio
import aiohttp
from dataclasses import dataclass
from typing import Optional
@dataclass
class ResultadoRequisicao:
url: str
status: int
tamanho: int
tempo: float
erro: Optional[str] = None
async def verificar_site(session, url, timeout=10):
"""Verifica o status de um site."""
import time
inicio = time.time()
try:
async with session.get(
url, timeout=aiohttp.ClientTimeout(total=timeout)
) as response:
conteudo = await response.text()
tempo = time.time() - inicio
return ResultadoRequisicao(
url=url,
status=response.status,
tamanho=len(conteudo),
tempo=tempo,
)
except asyncio.TimeoutError:
return ResultadoRequisicao(
url=url, status=0, tamanho=0,
tempo=timeout, erro="Timeout"
)
except Exception as e:
return ResultadoRequisicao(
url=url, status=0, tamanho=0,
tempo=time.time() - inicio, erro=str(e)
)
async def monitorar_sites(urls):
"""Monitora múltiplos sites simultaneamente."""
async with aiohttp.ClientSession() as session:
tarefas = [verificar_site(session, url) for url in urls]
resultados = await asyncio.gather(*tarefas)
print(f"\n{'URL':<40} {'Status':<8} {'Tempo':<10} {'Tamanho':<10}")
print("-" * 70)
for r in resultados:
if r.erro:
print(f"{r.url:<40} {'ERRO':<8} {r.tempo:<10.3f} {r.erro}")
else:
print(f"{r.url:<40} {r.status:<8} {r.tempo:<10.3f} {r.tamanho:<10}")
tempos = [r.tempo for r in resultados if not r.erro]
if tempos:
print(f"\nMédia de resposta: {sum(tempos)/len(tempos):.3f}s")
print(f"Sites com erro: {sum(1 for r in resultados if r.erro)}")
# Uso
urls = [
"https://www.google.com",
"https://www.github.com",
"https://www.python.org",
"https://httpbin.org/get",
"https://www.wikipedia.org",
]
asyncio.run(monitorar_sites(urls))
Produtor-Consumidor com asyncio.Queue
import asyncio
import random
async def produtor(queue, nome, total_itens):
"""Produz itens e coloca na fila."""
for i in range(total_itens):
item = f"{nome}-item-{i}"
await asyncio.sleep(random.uniform(0.1, 0.5)) # Simula produção
await queue.put(item)
print(f" [Produtor {nome}] Produziu: {item}")
print(f" [Produtor {nome}] Finalizou!")
async def consumidor(queue, nome):
"""Consome itens da fila."""
while True:
try:
item = await asyncio.wait_for(queue.get(), timeout=2.0)
await asyncio.sleep(random.uniform(0.2, 0.8)) # Simula processamento
print(f" [Consumidor {nome}] Processou: {item}")
queue.task_done()
except asyncio.TimeoutError:
print(f" [Consumidor {nome}] Sem itens, encerrando...")
break
async def main():
queue = asyncio.Queue(maxsize=5)
# Criar produtores e consumidores
produtores = [
asyncio.create_task(produtor(queue, "P1", 5)),
asyncio.create_task(produtor(queue, "P2", 3)),
]
consumidores = [
asyncio.create_task(consumidor(queue, "C1")),
asyncio.create_task(consumidor(queue, "C2")),
asyncio.create_task(consumidor(queue, "C3")),
]
# Esperar produtores terminarem
await asyncio.gather(*produtores)
# Esperar a fila esvaziar
await queue.join()
# Cancelar consumidores
for c in consumidores:
c.cancel()
print("\nTodos os itens foram processados!")
asyncio.run(main())
Semáforo para Limitar Concorrência
import asyncio
import aiohttp
async def baixar_com_limite(semaforo, session, url, destino):
"""Baixa um arquivo respeitando o limite de concorrência."""
async with semaforo: # Limita o número de downloads simultâneos
print(f"Baixando: {url}")
async with session.get(url) as response:
conteudo = await response.read()
# Em um caso real, salvaria o arquivo:
# with open(destino, 'wb') as f:
# f.write(conteudo)
print(f" Concluído: {destino} ({len(conteudo)} bytes)")
return len(conteudo)
async def baixar_multiplos(urls, max_concorrente=3):
"""Baixa múltiplos arquivos com limite de concorrência."""
semaforo = asyncio.Semaphore(max_concorrente)
async with aiohttp.ClientSession() as session:
tarefas = [
baixar_com_limite(semaforo, session, url, f"arquivo_{i}.html")
for i, url in enumerate(urls)
]
resultados = await asyncio.gather(*tarefas)
total = sum(resultados)
print(f"\nTotal baixado: {total:,} bytes")
urls = [
"https://www.python.org",
"https://www.github.com",
"https://httpbin.org/html",
"https://www.wikipedia.org",
"https://www.google.com",
]
asyncio.run(baixar_multiplos(urls, max_concorrente=2))
Async com Context Managers e Iteradores
import asyncio
class ConexaoBD:
"""Exemplo de context manager assíncrono."""
def __init__(self, nome_bd):
self.nome_bd = nome_bd
self.conectado = False
async def __aenter__(self):
print(f"Conectando ao banco {self.nome_bd}...")
await asyncio.sleep(0.5) # Simula conexão
self.conectado = True
print(f"Conectado a {self.nome_bd}!")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f"Desconectando de {self.nome_bd}...")
await asyncio.sleep(0.1)
self.conectado = False
print(f"Desconectado de {self.nome_bd}.")
async def consultar(self, query):
await asyncio.sleep(0.2) # Simula consulta
return [{"id": 1, "nome": "Resultado"}]
class LeitorAssincrono:
"""Exemplo de iterador assíncrono."""
def __init__(self, itens):
self.itens = itens
self.indice = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.indice >= len(self.itens):
raise StopAsyncIteration
await asyncio.sleep(0.1) # Simula I/O
item = self.itens[self.indice]
self.indice += 1
return item
async def main():
# Usando async context manager
async with ConexaoBD("meu_banco") as db:
resultado = await db.consultar("SELECT * FROM usuarios")
print(f"Resultado: {resultado}")
print()
# Usando async iterator
async for item in LeitorAssincrono(["A", "B", "C", "D"]):
print(f"Item: {item}")
asyncio.run(main())
Erros Comuns e Como Evitar
import asyncio
# ERRO 1: Esquecer de usar await
async def erro_sem_await():
# asyncio.sleep(1) # ERRADO! Retorna coroutine sem executar
await asyncio.sleep(1) # CORRETO
# ERRO 2: Chamar função async sem await
async def funcao_async():
return 42
async def main_erros():
# resultado = funcao_async() # ERRADO! resultado é uma coroutine
resultado = await funcao_async() # CORRETO
print(resultado)
# ERRO 3: Usar bloqueante dentro de async
async def errado():
import time
# time.sleep(5) # ERRADO! Bloqueia todo o event loop
await asyncio.sleep(5) # CORRETO
# ERRO 4: Se precisar de I/O bloqueante, use run_in_executor
async def correto_com_bloqueante():
import time
loop = asyncio.get_event_loop()
# Roda em thread separada, não bloqueia o event loop
await loop.run_in_executor(None, time.sleep, 1)
print("Não bloqueou!")
asyncio.run(main_erros())
Quando Usar Async?
Async é ideal quando seu código passa muito tempo esperando I/O:
- Requisições HTTP (APIs, web scraping)
- Consultas a banco de dados
- Leitura/escrita de arquivos
- WebSockets e comunicação em rede
- Qualquer operação que envolva esperar
Async NaO ajuda com:
- Cálculos pesados (CPU-bound) - use
multiprocessing - Tarefas simples e sequenciais
- Scripts pequenos de uso único
A regra é simples: se seu programa passa mais tempo esperando do que calculando, async vai ajudar. Se ele gasta tempo fazendo cálculos, use multiprocessing ou concurrent.futures.
Programação assíncrona pode parecer complexa no início, mas com prática se torna natural. Comece com exemplos simples e vá aumentando a complexidade aos poucos. O mercado brasileiro valoriza muito essa habilidade, especialmente para posições de backend e microsserviços.
Equipe Python Brasil
Contribuidor do Python Brasil — Aprenda Python em Português