Python Async/Await: Guia Completo

Domine programação assíncrona em Python com asyncio e async/await. Aprenda com exemplos práticos usando aiohttp para requisições HTTP concorrentes.

7 min de leitura Equipe Python Brasil

Programação assíncrona é uma das habilidades mais valorizadas no mercado atual. Com async/await, Python permite executar múltiplas operações de I/O simultaneamente sem usar threads. Neste guia, a gente vai entender os conceitos e aplicar na prática.

O Problema: Código Síncrono Lento

Imagine que você precisa fazer 10 requisições HTTP. No modo síncrono, cada requisição espera a anterior terminar:

import requests
import time

def buscar_sincrono(urls):
    """Busca URLs uma por uma (lento)."""
    resultados = []
    for url in urls:
        response = requests.get(url)
        resultados.append(response.status_code)
        print(f"  {url} -> {response.status_code}")
    return resultados

urls = [f"https://httpbin.org/delay/1" for _ in range(5)]

inicio = time.time()
buscar_sincrono(urls)
tempo = time.time() - inicio
print(f"\nSíncrono: {tempo:.2f} segundos (uma por vez)")
# Resultado: ~5 segundos (1 segundo por requisição)

A Solução: Async/Await

Com programação assíncrona, todas as requisições são feitas “ao mesmo tempo”:

import asyncio
import aiohttp
import time

async def buscar_url(session, url):
    """Busca uma URL de forma assíncrona."""
    async with session.get(url) as response:
        status = response.status
        print(f"  {url} -> {status}")
        return status

async def buscar_assincrono(urls):
    """Busca todas as URLs concorrentemente (rápido)."""
    async with aiohttp.ClientSession() as session:
        tarefas = [buscar_url(session, url) for url in urls]
        resultados = await asyncio.gather(*tarefas)
        return resultados

urls = [f"https://httpbin.org/delay/1" for _ in range(5)]

inicio = time.time()
resultados = asyncio.run(buscar_assincrono(urls))
tempo = time.time() - inicio
print(f"\nAssíncrono: {tempo:.2f} segundos (todas ao mesmo tempo)")
# Resultado: ~1 segundo (todas em paralelo!)

Entendendo os Conceitos

Coroutines

Uma coroutine é uma função definida com async def. Ela pode ser pausada e retomada:

import asyncio

async def saudacao(nome, delay):
    """Uma coroutine simples."""
    print(f"Olá, {nome}!")
    await asyncio.sleep(delay)  # Pausa sem bloquear
    print(f"Tchau, {nome}! (após {delay}s)")
    return f"Concluído: {nome}"

async def main():
    # Executar coroutines sequencialmente
    print("=== Sequencial ===")
    resultado1 = await saudacao("Ana", 2)
    resultado2 = await saudacao("Carlos", 1)
    print(f"Resultados: {resultado1}, {resultado2}")

    # Executar coroutines concorrentemente
    print("\n=== Concorrente ===")
    resultados = await asyncio.gather(
        saudacao("Maria", 2),
        saudacao("João", 1),
        saudacao("Pedro", 3),
    )
    print(f"Resultados: {resultados}")

asyncio.run(main())

Tasks

Tasks são wrappers que permitem agendar coroutines para execução concorrente:

import asyncio

async def processar_item(item, delay):
    print(f"Processando {item}...")
    await asyncio.sleep(delay)
    print(f"  {item} concluído!")
    return f"Resultado de {item}"

async def main():
    # Criar tasks explicitamente
    task1 = asyncio.create_task(processar_item("A", 2))
    task2 = asyncio.create_task(processar_item("B", 1))
    task3 = asyncio.create_task(processar_item("C", 3))

    # As tasks já estão rodando! Agora vamos esperar
    resultado1 = await task1
    resultado2 = await task2
    resultado3 = await task3

    print(f"\nResultados: {resultado1}, {resultado2}, {resultado3}")

asyncio.run(main())

asyncio.gather vs asyncio.wait

import asyncio

async def tarefa(nome, delay, falhar=False):
    await asyncio.sleep(delay)
    if falhar:
        raise ValueError(f"{nome} falhou!")
    return f"{nome}: OK"

async def main():
    # gather - espera todas e retorna resultados na mesma ordem
    resultados = await asyncio.gather(
        tarefa("A", 1),
        tarefa("B", 2),
        tarefa("C", 1),
    )
    print(f"gather: {resultados}")

    # gather com return_exceptions - não propaga erros
    resultados = await asyncio.gather(
        tarefa("X", 1),
        tarefa("Y", 1, falhar=True),
        tarefa("Z", 1),
        return_exceptions=True,
    )
    for r in resultados:
        if isinstance(r, Exception):
            print(f"  Erro: {r}")
        else:
            print(f"  Sucesso: {r}")

    # wait - mais controle sobre concluídas/pendentes
    tarefas = [
        asyncio.create_task(tarefa("P", 1)),
        asyncio.create_task(tarefa("Q", 3)),
        asyncio.create_task(tarefa("R", 2)),
    ]

    concluidas, pendentes = await asyncio.wait(
        tarefas, timeout=2.0
    )
    print(f"\nConcluídas: {len(concluidas)}, Pendentes: {len(pendentes)}")

asyncio.run(main())

Exemplos Práticos

Web Scraper Assíncrono

import asyncio
import aiohttp
from dataclasses import dataclass
from typing import Optional

@dataclass
class ResultadoRequisicao:
    url: str
    status: int
    tamanho: int
    tempo: float
    erro: Optional[str] = None

async def verificar_site(session, url, timeout=10):
    """Verifica o status de um site."""
    import time
    inicio = time.time()

    try:
        async with session.get(
            url, timeout=aiohttp.ClientTimeout(total=timeout)
        ) as response:
            conteudo = await response.text()
            tempo = time.time() - inicio
            return ResultadoRequisicao(
                url=url,
                status=response.status,
                tamanho=len(conteudo),
                tempo=tempo,
            )
    except asyncio.TimeoutError:
        return ResultadoRequisicao(
            url=url, status=0, tamanho=0,
            tempo=timeout, erro="Timeout"
        )
    except Exception as e:
        return ResultadoRequisicao(
            url=url, status=0, tamanho=0,
            tempo=time.time() - inicio, erro=str(e)
        )

async def monitorar_sites(urls):
    """Monitora múltiplos sites simultaneamente."""
    async with aiohttp.ClientSession() as session:
        tarefas = [verificar_site(session, url) for url in urls]
        resultados = await asyncio.gather(*tarefas)

    print(f"\n{'URL':<40} {'Status':<8} {'Tempo':<10} {'Tamanho':<10}")
    print("-" * 70)

    for r in resultados:
        if r.erro:
            print(f"{r.url:<40} {'ERRO':<8} {r.tempo:<10.3f} {r.erro}")
        else:
            print(f"{r.url:<40} {r.status:<8} {r.tempo:<10.3f} {r.tamanho:<10}")

    tempos = [r.tempo for r in resultados if not r.erro]
    if tempos:
        print(f"\nMédia de resposta: {sum(tempos)/len(tempos):.3f}s")
        print(f"Sites com erro: {sum(1 for r in resultados if r.erro)}")

# Uso
urls = [
    "https://www.google.com",
    "https://www.github.com",
    "https://www.python.org",
    "https://httpbin.org/get",
    "https://www.wikipedia.org",
]

asyncio.run(monitorar_sites(urls))

Produtor-Consumidor com asyncio.Queue

import asyncio
import random

async def produtor(queue, nome, total_itens):
    """Produz itens e coloca na fila."""
    for i in range(total_itens):
        item = f"{nome}-item-{i}"
        await asyncio.sleep(random.uniform(0.1, 0.5))  # Simula produção
        await queue.put(item)
        print(f"  [Produtor {nome}] Produziu: {item}")

    print(f"  [Produtor {nome}] Finalizou!")

async def consumidor(queue, nome):
    """Consome itens da fila."""
    while True:
        try:
            item = await asyncio.wait_for(queue.get(), timeout=2.0)
            await asyncio.sleep(random.uniform(0.2, 0.8))  # Simula processamento
            print(f"  [Consumidor {nome}] Processou: {item}")
            queue.task_done()
        except asyncio.TimeoutError:
            print(f"  [Consumidor {nome}] Sem itens, encerrando...")
            break

async def main():
    queue = asyncio.Queue(maxsize=5)

    # Criar produtores e consumidores
    produtores = [
        asyncio.create_task(produtor(queue, "P1", 5)),
        asyncio.create_task(produtor(queue, "P2", 3)),
    ]

    consumidores = [
        asyncio.create_task(consumidor(queue, "C1")),
        asyncio.create_task(consumidor(queue, "C2")),
        asyncio.create_task(consumidor(queue, "C3")),
    ]

    # Esperar produtores terminarem
    await asyncio.gather(*produtores)

    # Esperar a fila esvaziar
    await queue.join()

    # Cancelar consumidores
    for c in consumidores:
        c.cancel()

    print("\nTodos os itens foram processados!")

asyncio.run(main())

Semáforo para Limitar Concorrência

import asyncio
import aiohttp

async def baixar_com_limite(semaforo, session, url, destino):
    """Baixa um arquivo respeitando o limite de concorrência."""
    async with semaforo:  # Limita o número de downloads simultâneos
        print(f"Baixando: {url}")
        async with session.get(url) as response:
            conteudo = await response.read()
            # Em um caso real, salvaria o arquivo:
            # with open(destino, 'wb') as f:
            #     f.write(conteudo)
            print(f"  Concluído: {destino} ({len(conteudo)} bytes)")
            return len(conteudo)

async def baixar_multiplos(urls, max_concorrente=3):
    """Baixa múltiplos arquivos com limite de concorrência."""
    semaforo = asyncio.Semaphore(max_concorrente)

    async with aiohttp.ClientSession() as session:
        tarefas = [
            baixar_com_limite(semaforo, session, url, f"arquivo_{i}.html")
            for i, url in enumerate(urls)
        ]
        resultados = await asyncio.gather(*tarefas)

    total = sum(resultados)
    print(f"\nTotal baixado: {total:,} bytes")

urls = [
    "https://www.python.org",
    "https://www.github.com",
    "https://httpbin.org/html",
    "https://www.wikipedia.org",
    "https://www.google.com",
]

asyncio.run(baixar_multiplos(urls, max_concorrente=2))

Async com Context Managers e Iteradores

import asyncio

class ConexaoBD:
    """Exemplo de context manager assíncrono."""

    def __init__(self, nome_bd):
        self.nome_bd = nome_bd
        self.conectado = False

    async def __aenter__(self):
        print(f"Conectando ao banco {self.nome_bd}...")
        await asyncio.sleep(0.5)  # Simula conexão
        self.conectado = True
        print(f"Conectado a {self.nome_bd}!")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"Desconectando de {self.nome_bd}...")
        await asyncio.sleep(0.1)
        self.conectado = False
        print(f"Desconectado de {self.nome_bd}.")

    async def consultar(self, query):
        await asyncio.sleep(0.2)  # Simula consulta
        return [{"id": 1, "nome": "Resultado"}]


class LeitorAssincrono:
    """Exemplo de iterador assíncrono."""

    def __init__(self, itens):
        self.itens = itens
        self.indice = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.indice >= len(self.itens):
            raise StopAsyncIteration
        await asyncio.sleep(0.1)  # Simula I/O
        item = self.itens[self.indice]
        self.indice += 1
        return item


async def main():
    # Usando async context manager
    async with ConexaoBD("meu_banco") as db:
        resultado = await db.consultar("SELECT * FROM usuarios")
        print(f"Resultado: {resultado}")

    print()

    # Usando async iterator
    async for item in LeitorAssincrono(["A", "B", "C", "D"]):
        print(f"Item: {item}")

asyncio.run(main())

Erros Comuns e Como Evitar

import asyncio

# ERRO 1: Esquecer de usar await
async def erro_sem_await():
    # asyncio.sleep(1)  # ERRADO! Retorna coroutine sem executar
    await asyncio.sleep(1)  # CORRETO

# ERRO 2: Chamar função async sem await
async def funcao_async():
    return 42

async def main_erros():
    # resultado = funcao_async()  # ERRADO! resultado é uma coroutine
    resultado = await funcao_async()  # CORRETO
    print(resultado)

# ERRO 3: Usar bloqueante dentro de async
async def errado():
    import time
    # time.sleep(5)  # ERRADO! Bloqueia todo o event loop
    await asyncio.sleep(5)  # CORRETO

# ERRO 4: Se precisar de I/O bloqueante, use run_in_executor
async def correto_com_bloqueante():
    import time
    loop = asyncio.get_event_loop()
    # Roda em thread separada, não bloqueia o event loop
    await loop.run_in_executor(None, time.sleep, 1)
    print("Não bloqueou!")

asyncio.run(main_erros())

Quando Usar Async?

Async é ideal quando seu código passa muito tempo esperando I/O:

  • Requisições HTTP (APIs, web scraping)
  • Consultas a banco de dados
  • Leitura/escrita de arquivos
  • WebSockets e comunicação em rede
  • Qualquer operação que envolva esperar

Async NaO ajuda com:

  • Cálculos pesados (CPU-bound) - use multiprocessing
  • Tarefas simples e sequenciais
  • Scripts pequenos de uso único

A regra é simples: se seu programa passa mais tempo esperando do que calculando, async vai ajudar. Se ele gasta tempo fazendo cálculos, use multiprocessing ou concurrent.futures.

Programação assíncrona pode parecer complexa no início, mas com prática se torna natural. Comece com exemplos simples e vá aumentando a complexidade aos poucos. O mercado brasileiro valoriza muito essa habilidade, especialmente para posições de backend e microsserviços.

E

Equipe Python Brasil

Contribuidor do Python Brasil — Aprenda Python em Português