---
title: "Python Async/Await: Guia Completo"
url: "https://python.dev.br/blog/python-async-await/"
markdown_url: "https://python.dev.br/blog/python-async-await.MD"
description: "Domine programação assíncrona em Python com asyncio e async/await. Aprenda com exemplos práticos usando aiohttp para requisições HTTP concorrentes."
date: "2026-02-20"
author: "Equipe Python Brasil"
---

# Python Async/Await: Guia Completo

Domine programação assíncrona em Python com asyncio e async/await. Aprenda com exemplos práticos usando aiohttp para requisições HTTP concorrentes.


Programação assíncrona é uma das habilidades mais valorizadas no mercado atual. Com `async/await`, Python permite executar múltiplas operações de I/O simultaneamente sem usar threads. Neste guia, a gente vai entender os conceitos e aplicar na prática.

Se a sua dúvida é sobre tirar trabalho do ciclo HTTP de uma API, leia também o guia de [FastAPI Background Tasks, Celery e Redis](/blog/fastapi-background-tasks-celery-redis-2026/). `async/await` melhora concorrência de I/O dentro do processo; filas e workers resolvem tarefas longas ou críticas fora da requisição.

## O Problema: Código Síncrono Lento

Imagine que você precisa fazer 10 requisições HTTP. No modo síncrono, cada requisição espera a anterior terminar:

```python
import requests
import time

def buscar_sincrono(urls):
    """Busca URLs uma por uma (lento)."""
    resultados = []
    for url in urls:
        response = requests.get(url)
        resultados.append(response.status_code)
        print(f"  {url} -> {response.status_code}")
    return resultados

urls = [f"https://httpbin.org/delay/1" for _ in range(5)]

inicio = time.time()
buscar_sincrono(urls)
tempo = time.time() - inicio
print(f"\nSíncrono: {tempo:.2f} segundos (uma por vez)")
# Resultado: ~5 segundos (1 segundo por requisição)
```

## A Solução: Async/Await

Com programação assíncrona, todas as requisições são feitas "ao mesmo tempo":

```python
import asyncio
import aiohttp
import time

async def buscar_url(session, url):
    """Busca uma URL de forma assíncrona."""
    async with session.get(url) as response:
        status = response.status
        print(f"  {url} -> {status}")
        return status

async def buscar_assincrono(urls):
    """Busca todas as URLs concorrentemente (rápido)."""
    async with aiohttp.ClientSession() as session:
        tarefas = [buscar_url(session, url) for url in urls]
        resultados = await asyncio.gather(*tarefas)
        return resultados

urls = [f"https://httpbin.org/delay/1" for _ in range(5)]

inicio = time.time()
resultados = asyncio.run(buscar_assincrono(urls))
tempo = time.time() - inicio
print(f"\nAssíncrono: {tempo:.2f} segundos (todas ao mesmo tempo)")
# Resultado: ~1 segundo (todas em paralelo!)
```

## Entendendo os Conceitos

### Coroutines

Uma **coroutine** é uma função definida com `async def`. Ela pode ser pausada e retomada:

```python
import asyncio

async def saudacao(nome, delay):
    """Uma coroutine simples."""
    print(f"Olá, {nome}!")
    await asyncio.sleep(delay)  # Pausa sem bloquear
    print(f"Tchau, {nome}! (após {delay}s)")
    return f"Concluído: {nome}"

async def main():
    # Executar coroutines sequencialmente
    print("=== Sequencial ===")
    resultado1 = await saudacao("Ana", 2)
    resultado2 = await saudacao("Carlos", 1)
    print(f"Resultados: {resultado1}, {resultado2}")

    # Executar coroutines concorrentemente
    print("\n=== Concorrente ===")
    resultados = await asyncio.gather(
        saudacao("Maria", 2),
        saudacao("João", 1),
        saudacao("Pedro", 3),
    )
    print(f"Resultados: {resultados}")

asyncio.run(main())
```

### Tasks

Tasks são wrappers que permitem agendar coroutines para execução concorrente:

```python
import asyncio

async def processar_item(item, delay):
    print(f"Processando {item}...")
    await asyncio.sleep(delay)
    print(f"  {item} concluído!")
    return f"Resultado de {item}"

async def main():
    # Criar tasks explicitamente
    task1 = asyncio.create_task(processar_item("A", 2))
    task2 = asyncio.create_task(processar_item("B", 1))
    task3 = asyncio.create_task(processar_item("C", 3))

    # As tasks já estão rodando! Agora vamos esperar
    resultado1 = await task1
    resultado2 = await task2
    resultado3 = await task3

    print(f"\nResultados: {resultado1}, {resultado2}, {resultado3}")

asyncio.run(main())
```

### asyncio.gather vs asyncio.wait

```python
import asyncio

async def tarefa(nome, delay, falhar=False):
    await asyncio.sleep(delay)
    if falhar:
        raise ValueError(f"{nome} falhou!")
    return f"{nome}: OK"

async def main():
    # gather - espera todas e retorna resultados na mesma ordem
    resultados = await asyncio.gather(
        tarefa("A", 1),
        tarefa("B", 2),
        tarefa("C", 1),
    )
    print(f"gather: {resultados}")

    # gather com return_exceptions - não propaga erros
    resultados = await asyncio.gather(
        tarefa("X", 1),
        tarefa("Y", 1, falhar=True),
        tarefa("Z", 1),
        return_exceptions=True,
    )
    for r in resultados:
        if isinstance(r, Exception):
            print(f"  Erro: {r}")
        else:
            print(f"  Sucesso: {r}")

    # wait - mais controle sobre concluídas/pendentes
    tarefas = [
        asyncio.create_task(tarefa("P", 1)),
        asyncio.create_task(tarefa("Q", 3)),
        asyncio.create_task(tarefa("R", 2)),
    ]

    concluidas, pendentes = await asyncio.wait(
        tarefas, timeout=2.0
    )
    print(f"\nConcluídas: {len(concluidas)}, Pendentes: {len(pendentes)}")

asyncio.run(main())
```

## Exemplos Práticos

### Web Scraper Assíncrono

```python
import asyncio
import aiohttp
from dataclasses import dataclass
from typing import Optional

@dataclass
class ResultadoRequisicao:
    url: str
    status: int
    tamanho: int
    tempo: float
    erro: Optional[str] = None

async def verificar_site(session, url, timeout=10):
    """Verifica o status de um site."""
    import time
    inicio = time.time()

    try:
        async with session.get(
            url, timeout=aiohttp.ClientTimeout(total=timeout)
        ) as response:
            conteudo = await response.text()
            tempo = time.time() - inicio
            return ResultadoRequisicao(
                url=url,
                status=response.status,
                tamanho=len(conteudo),
                tempo=tempo,
            )
    except asyncio.TimeoutError:
        return ResultadoRequisicao(
            url=url, status=0, tamanho=0,
            tempo=timeout, erro="Timeout"
        )
    except Exception as e:
        return ResultadoRequisicao(
            url=url, status=0, tamanho=0,
            tempo=time.time() - inicio, erro=str(e)
        )

async def monitorar_sites(urls):
    """Monitora múltiplos sites simultaneamente."""
    async with aiohttp.ClientSession() as session:
        tarefas = [verificar_site(session, url) for url in urls]
        resultados = await asyncio.gather(*tarefas)

    print(f"\n{'URL':<40} {'Status':<8} {'Tempo':<10} {'Tamanho':<10}")
    print("-" * 70)

    for r in resultados:
        if r.erro:
            print(f"{r.url:<40} {'ERRO':<8} {r.tempo:<10.3f} {r.erro}")
        else:
            print(f"{r.url:<40} {r.status:<8} {r.tempo:<10.3f} {r.tamanho:<10}")

    tempos = [r.tempo for r in resultados if not r.erro]
    if tempos:
        print(f"\nMédia de resposta: {sum(tempos)/len(tempos):.3f}s")
        print(f"Sites com erro: {sum(1 for r in resultados if r.erro)}")

# Uso
urls = [
    "https://www.google.com",
    "https://www.github.com",
    "https://www.python.org",
    "https://httpbin.org/get",
    "https://www.wikipedia.org",
]

asyncio.run(monitorar_sites(urls))
```

### Produtor-Consumidor com asyncio.Queue

```python
import asyncio
import random

async def produtor(queue, nome, total_itens):
    """Produz itens e coloca na fila."""
    for i in range(total_itens):
        item = f"{nome}-item-{i}"
        await asyncio.sleep(random.uniform(0.1, 0.5))  # Simula produção
        await queue.put(item)
        print(f"  [Produtor {nome}] Produziu: {item}")

    print(f"  [Produtor {nome}] Finalizou!")

async def consumidor(queue, nome):
    """Consome itens da fila."""
    while True:
        try:
            item = await asyncio.wait_for(queue.get(), timeout=2.0)
            await asyncio.sleep(random.uniform(0.2, 0.8))  # Simula processamento
            print(f"  [Consumidor {nome}] Processou: {item}")
            queue.task_done()
        except asyncio.TimeoutError:
            print(f"  [Consumidor {nome}] Sem itens, encerrando...")
            break

async def main():
    queue = asyncio.Queue(maxsize=5)

    # Criar produtores e consumidores
    produtores = [
        asyncio.create_task(produtor(queue, "P1", 5)),
        asyncio.create_task(produtor(queue, "P2", 3)),
    ]

    consumidores = [
        asyncio.create_task(consumidor(queue, "C1")),
        asyncio.create_task(consumidor(queue, "C2")),
        asyncio.create_task(consumidor(queue, "C3")),
    ]

    # Esperar produtores terminarem
    await asyncio.gather(*produtores)

    # Esperar a fila esvaziar
    await queue.join()

    # Cancelar consumidores
    for c in consumidores:
        c.cancel()

    print("\nTodos os itens foram processados!")

asyncio.run(main())
```

### Semáforo para Limitar Concorrência

```python
import asyncio
import aiohttp

async def baixar_com_limite(semaforo, session, url, destino):
    """Baixa um arquivo respeitando o limite de concorrência."""
    async with semaforo:  # Limita o número de downloads simultâneos
        print(f"Baixando: {url}")
        async with session.get(url) as response:
            conteudo = await response.read()
            # Em um caso real, salvaria o arquivo:
            # with open(destino, 'wb') as f:
            #     f.write(conteudo)
            print(f"  Concluído: {destino} ({len(conteudo)} bytes)")
            return len(conteudo)

async def baixar_multiplos(urls, max_concorrente=3):
    """Baixa múltiplos arquivos com limite de concorrência."""
    semaforo = asyncio.Semaphore(max_concorrente)

    async with aiohttp.ClientSession() as session:
        tarefas = [
            baixar_com_limite(semaforo, session, url, f"arquivo_{i}.html")
            for i, url in enumerate(urls)
        ]
        resultados = await asyncio.gather(*tarefas)

    total = sum(resultados)
    print(f"\nTotal baixado: {total:,} bytes")

urls = [
    "https://www.python.org",
    "https://www.github.com",
    "https://httpbin.org/html",
    "https://www.wikipedia.org",
    "https://www.google.com",
]

asyncio.run(baixar_multiplos(urls, max_concorrente=2))
```

## Async com Context Managers e Iteradores

```python
import asyncio

class ConexaoBD:
    """Exemplo de context manager assíncrono."""

    def __init__(self, nome_bd):
        self.nome_bd = nome_bd
        self.conectado = False

    async def __aenter__(self):
        print(f"Conectando ao banco {self.nome_bd}...")
        await asyncio.sleep(0.5)  # Simula conexão
        self.conectado = True
        print(f"Conectado a {self.nome_bd}!")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"Desconectando de {self.nome_bd}...")
        await asyncio.sleep(0.1)
        self.conectado = False
        print(f"Desconectado de {self.nome_bd}.")

    async def consultar(self, query):
        await asyncio.sleep(0.2)  # Simula consulta
        return [{"id": 1, "nome": "Resultado"}]


class LeitorAssincrono:
    """Exemplo de iterador assíncrono."""

    def __init__(self, itens):
        self.itens = itens
        self.indice = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.indice >= len(self.itens):
            raise StopAsyncIteration
        await asyncio.sleep(0.1)  # Simula I/O
        item = self.itens[self.indice]
        self.indice += 1
        return item


async def main():
    # Usando async context manager
    async with ConexaoBD("meu_banco") as db:
        resultado = await db.consultar("SELECT * FROM usuarios")
        print(f"Resultado: {resultado}")

    print()

    # Usando async iterator
    async for item in LeitorAssincrono(["A", "B", "C", "D"]):
        print(f"Item: {item}")

asyncio.run(main())
```

## Erros Comuns e Como Evitar

```python
import asyncio

# ERRO 1: Esquecer de usar await
async def erro_sem_await():
    # asyncio.sleep(1)  # ERRADO! Retorna coroutine sem executar
    await asyncio.sleep(1)  # CORRETO

# ERRO 2: Chamar função async sem await
async def funcao_async():
    return 42

async def main_erros():
    # resultado = funcao_async()  # ERRADO! resultado é uma coroutine
    resultado = await funcao_async()  # CORRETO
    print(resultado)

# ERRO 3: Usar bloqueante dentro de async
async def errado():
    import time
    # time.sleep(5)  # ERRADO! Bloqueia todo o event loop
    await asyncio.sleep(5)  # CORRETO

# ERRO 4: Se precisar de I/O bloqueante, use run_in_executor
async def correto_com_bloqueante():
    import time
    loop = asyncio.get_event_loop()
    # Roda em thread separada, não bloqueia o event loop
    await loop.run_in_executor(None, time.sleep, 1)
    print("Não bloqueou!")

asyncio.run(main_erros())
```

## Quando Usar Async?

Async é ideal quando seu código passa muito tempo **esperando I/O**:

- Requisições HTTP (APIs, web scraping)
- Consultas a banco de dados
- Leitura/escrita de arquivos
- WebSockets e comunicação em rede
- Qualquer operação que envolva esperar

Async **NaO** ajuda com:

- Cálculos pesados (CPU-bound) - use `multiprocessing`
- Tarefas simples e sequenciais
- Scripts pequenos de uso único

A regra é simples: se seu programa passa mais tempo esperando do que calculando, async vai ajudar. Se ele gasta tempo fazendo cálculos, use `multiprocessing` ou `concurrent.futures`.

Programação assíncrona pode parecer complexa no início, mas com prática se torna natural. Comece com exemplos simples e vá aumentando a complexidade aos poucos. O mercado brasileiro valoriza muito essa habilidade, especialmente para posições de backend e microsserviços.

> **Curiosidade**: se você gosta do modelo de concorrência assíncrona, vale conhecer <a href="https://golang.com.br/aprenda/concorrencia-go/" target="_blank" rel="noopener noreferrer" onclick="umami.track('portfolio-site-click', { destination: 'golang.com.br' })">Go e suas goroutines</a>, que tratam concorrência como cidadão de primeira classe na linguagem. Já <a href="https://rustlang.com.br/ecossistema/tokio/" target="_blank" rel="noopener noreferrer" onclick="umami.track('portfolio-site-click', { destination: 'rustlang.com.br' })">Rust com Tokio</a> oferece async/await com garantias de segurança de memória em tempo de compilação.
