---
title: "WebSocket: O que É e Como Funciona | Python Brasil"
url: "https://python.dev.br/glossario/websocket/"
markdown_url: "https://python.dev.br/glossario/websocket.MD"
description: "Guia completo sobre WebSocket em Python: protocolo, handshake, heartbeat, Django Channels, escalonamento com Redis e segurança."
date: "2026-03-15"
author: ""
---

# WebSocket: O que É e Como Funciona | Python Brasil

Guia completo sobre WebSocket em Python: protocolo, handshake, heartbeat, Django Channels, escalonamento com Redis e segurança.


## O que é WebSocket?

**WebSocket** é um protocolo de comunicação que permite **troca de dados bidirecional** entre cliente e servidor em **tempo real** sobre uma única conexão TCP persistente. Diferente do HTTP tradicional (onde o cliente sempre inicia a comunicação), com WebSocket tanto o servidor quanto o cliente podem enviar dados a qualquer momento, sem necessidade de uma nova requisição.

O protocolo WebSocket foi padronizado pela IETF como **RFC 6455** em 2011 e é suportado nativamente por todos os navegadores modernos.

## WebSocket vs HTTP

| Aspecto | HTTP | WebSocket |
|---|---|---|
| Modelo | Request-response | Full-duplex bidirecional |
| Conexão | Nova a cada requisição | Persistente |
| Latência | Alta (overhead de cabeçalhos) | Baixa (frames pequenos) |
| Overhead | Cabeçalhos grandes em cada req. | Cabeçalhos mínimos após handshake |
| Ideal para | APIs, conteúdo estático | Chat, jogos, dashboards ao vivo |

## O Protocolo em Detalhe: Handshake

A conexão WebSocket começa com um **HTTP Upgrade request**. O cliente envia uma requisição HTTP especial:

```
GET /ws/chat HTTP/1.1
Host: exemplo.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
```

O servidor responde com:

```
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
```

A partir desse momento, a conexão HTTP é substituída pelo protocolo WebSocket. Não há mais overhead de cabeçalhos HTTP — os dados trafegam em **frames** compactos.

## Frames WebSocket

Os dados em WebSocket são enviados em **frames**, que têm:

- **Opcode**: tipo do frame (texto, binário, ping, pong, close)
- **Payload**: os dados em si
- **Máscara**: clientes devem mascarar dados enviados ao servidor (segurança)

Tipos de frames:
- `0x1`: dados de texto (UTF-8)
- `0x2`: dados binários
- `0x8`: encerramento de conexão
- `0x9`: ping
- `0xA`: pong

## Ciclo de Vida da Conexão

1. **Handshake**: upgrade de HTTP para WebSocket
2. **Conexão aberta**: cliente e servidor podem trocar mensagens livremente
3. **Troca de mensagens**: bidirecional, sem espera
4. **Heartbeat (ping/pong)**: mantém a conexão viva
5. **Encerramento**: qualquer lado envia frame de close com código de status

## Exemplo com a Biblioteca websockets

```python
import asyncio
import websockets
import json

# Servidor WebSocket
clientes_conectados = set()

async def handler(websocket):
    clientes_conectados.add(websocket)
    print(f"Novo cliente conectado. Total: {len(clientes_conectados)}")

    try:
        async for mensagem in websocket:
            dados = json.loads(mensagem)
            print(f"Recebido: {dados}")

            # Broadcast para todos os clientes
            if clientes_conectados:
                await asyncio.gather(
                    *[cliente.send(json.dumps({
                        "tipo": "mensagem",
                        "conteudo": dados.get("texto", ""),
                        "usuario": dados.get("usuario", "Anônimo")
                    })) for cliente in clientes_conectados]
                )
    except websockets.exceptions.ConnectionClosed as e:
        print(f"Cliente desconectado: {e.code} - {e.reason}")
    finally:
        clientes_conectados.discard(websocket)

async def main():
    async with websockets.serve(handler, "localhost", 8765):
        print("Servidor WebSocket em ws://localhost:8765")
        await asyncio.Future()  # Roda indefinidamente

asyncio.run(main())
```

## Heartbeat: Ping/Pong

Conexões WebSocket podem cair silenciosamente (firewall, timeout de rede). O mecanismo de **ping/pong** mantém a conexão viva e detecta desconexões:

```python
import asyncio
import websockets

async def handler(websocket):
    try:
        async for mensagem in websocket:
            await websocket.send(f"Eco: {mensagem}")
    except websockets.exceptions.ConnectionClosed:
        pass

async def main():
    async with websockets.serve(
        handler,
        "localhost",
        8765,
        ping_interval=20,  # Envia ping a cada 20 segundos
        ping_timeout=10,   # Fecha conexão se pong não chegar em 10s
    ):
        await asyncio.Future()

asyncio.run(main())
```

No lado do cliente JavaScript:

```javascript
const ws = new WebSocket("ws://localhost:8765");

ws.onopen = () => console.log("Conectado");
ws.onmessage = (evento) => console.log("Mensagem:", evento.data);
ws.onclose = (evento) => {
    console.log(`Desconectado: ${evento.code}`);
    // Reconectar após 3 segundos
    setTimeout(conectar, 3000);
};
```

## Estratégias de Reconexão

Clientes devem implementar reconexão automática com **backoff exponencial**:

```javascript
class WebSocketClient {
    constructor(url) {
        this.url = url;
        this.tentativas = 0;
        this.maxTentativas = 10;
        this.conectar();
    }

    conectar() {
        this.ws = new WebSocket(this.url);
        this.ws.onopen = () => {
            console.log("Conectado!");
            this.tentativas = 0; // Resetar contador
        };
        this.ws.onclose = () => {
            if (this.tentativas < this.maxTentativas) {
                const espera = Math.min(1000 * Math.pow(2, this.tentativas), 30000);
                console.log(`Reconectando em ${espera}ms...`);
                setTimeout(() => this.conectar(), espera);
                this.tentativas++;
            }
        };
    }
}
```

## Exemplo com FastAPI

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List

app = FastAPI()

class GerenciadorConexoes:
    def __init__(self):
        self.conexoes: List[WebSocket] = []

    async def conectar(self, websocket: WebSocket):
        await websocket.accept()
        self.conexoes.append(websocket)

    def desconectar(self, websocket: WebSocket):
        self.conexoes.remove(websocket)

    async def broadcast(self, mensagem: str):
        conexoes_ativas = self.conexoes.copy()
        for conexao in conexoes_ativas:
            try:
                await conexao.send_text(mensagem)
            except Exception:
                self.conexoes.remove(conexao)

gerenciador = GerenciadorConexoes()

@app.websocket("/ws/chat/{sala}")
async def chat(websocket: WebSocket, sala: str, usuario: str = "Anônimo"):
    await gerenciador.conectar(websocket)
    await gerenciador.broadcast(f"{usuario} entrou na sala {sala}")
    try:
        while True:
            dados = await websocket.receive_text()
            await gerenciador.broadcast(f"[{sala}] {usuario}: {dados}")
    except WebSocketDisconnect:
        gerenciador.desconectar(websocket)
        await gerenciador.broadcast(f"{usuario} saiu da sala {sala}")
```

## Escalonamento com Redis Pub/Sub

Quando a aplicação roda em múltiplos processos ou servidores, o gerenciador em memória não funciona. A solução é usar **Redis Pub/Sub** como broker de mensagens:

```python
import asyncio
import json
import redis.asyncio as aioredis
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()
redis_client = aioredis.from_url("redis://localhost")

conexoes_locais: set[WebSocket] = set()

async def ouvir_redis():
    """Ouve mensagens do Redis e distribui para conexões locais."""
    pubsub = redis_client.pubsub()
    await pubsub.subscribe("chat")
    async for mensagem in pubsub.listen():
        if mensagem["type"] == "message":
            texto = mensagem["data"].decode()
            conexoes_para_remover = set()
            for ws in conexoes_locais.copy():
                try:
                    await ws.send_text(texto)
                except Exception:
                    conexoes_para_remover.add(ws)
            conexoes_locais -= conexoes_para_remover

@app.on_event("startup")
async def startup():
    asyncio.create_task(ouvir_redis())

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    conexoes_locais.add(websocket)
    try:
        while True:
            dados = await websocket.receive_text()
            # Publica no Redis — todos os servidores receberão
            await redis_client.publish("chat", dados)
    except WebSocketDisconnect:
        conexoes_locais.discard(websocket)
```

## Django Channels

O **Django Channels** estende o Django para suportar WebSockets e outros protocolos assíncronos:

```python
# consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.sala = self.scope["url_route"]["kwargs"]["sala_nome"]
        self.grupo = f"chat_{self.sala}"

        # Entrar no grupo do Channel Layer (Redis)
        await self.channel_layer.group_add(self.grupo, self.channel_name)
        await self.accept()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(self.grupo, self.channel_name)

    async def receive(self, text_data):
        dados = json.loads(text_data)
        await self.channel_layer.group_send(
            self.grupo,
            {"type": "chat_message", "mensagem": dados["mensagem"]}
        )

    async def chat_message(self, evento):
        await self.send(text_data=json.dumps({"mensagem": evento["mensagem"]}))
```

## Socket.IO

**Socket.IO** é uma biblioteca que adiciona recursos sobre WebSocket: reconexão automática, namespaces, salas e fallback para long-polling. Em Python, use `python-socketio`:

```python
import socketio

sio = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*")

@sio.event
async def connect(sid, environ):
    print(f"Cliente {sid} conectado")

@sio.event
async def mensagem(sid, dados):
    await sio.emit("resposta", {"texto": dados["texto"]}, room=sid)

@sio.event
async def disconnect(sid):
    print(f"Cliente {sid} desconectado")

# Montar no FastAPI
from fastapi import FastAPI
app = FastAPI()
app.mount("/socket.io", socketio.ASGIApp(sio))
```

## Segurança

- **Autenticação no handshake**: passe o token JWT como query param ou em subprotocolo durante o upgrade
- **Validação de mensagens**: nunca confie em dados recebidos sem validação (use Pydantic)
- **Rate limiting por conexão**: limite mensagens por segundo para evitar flood
- **WSS obrigatório**: sempre use `wss://` (WebSocket sobre TLS) em produção, equivalente ao HTTPS
- **CORS no handshake**: o servidor deve verificar o header `Origin` durante o handshake

```python
# Autenticação no WebSocket com FastAPI
from fastapi import WebSocket, Query, HTTPException

@app.websocket("/ws")
async def websocket_autenticado(
    websocket: WebSocket,
    token: str = Query(...)
):
    usuario = verificar_token(token)
    if not usuario:
        await websocket.close(code=4001)  # Código customizado para não autorizado
        return
    await websocket.accept()
    # ... lógica do WebSocket
```

## Load Balancing

WebSockets são conexões persistentes — isso complica o uso com load balancers. Soluções:

- **Sticky sessions**: o load balancer direciona sempre o mesmo cliente para o mesmo servidor
- **Redis Pub/Sub**: qualquer servidor pode receber mensagens e retransmitir para conexões locais
- **Nginx com upgrade**: configure corretamente os headers de upgrade

```nginx
upstream websocket_backend {
    ip_hash;  # Sticky sessions
    server ws1:8000;
    server ws2:8000;
}

server {
    location /ws/ {
        proxy_pass http://websocket_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_read_timeout 3600;  # Timeout longo para conexões persistentes
    }
}
```

## Casos de Uso

WebSockets são ideais para:

- **Chats em tempo real**: mensagens instantâneas entre usuários
- **Dashboards ao vivo**: métricas e gráficos que se atualizam sem refresh. Em painéis de monitoramento de infraestrutura ou analytics, WebSockets permitem que dezenas de gráficos recebam atualizações simultâneas sem sobrecarregar o servidor com polling constante.
- **Jogos multiplayer**: sincronização de estado entre jogadores
- **Notificações push**: alertas enviados pelo servidor sem polling. Sistemas de notificação baseados em WebSocket entregam alertas instantaneamente ao navegador, sem depender de verificações periódicas que desperdiçam banda e aumentam a latência percebida pelo usuário.
- **Feeds financeiros**: cotações de ações e criptomoedas em tempo real
- **Colaboração em tempo real**: editores de documento como Google Docs

## Erros Comuns

- **Não tratar `WebSocketDisconnect`**: a conexão pode cair a qualquer momento; sempre use try/except
- **Gerenciador de conexões em memória em produção**: não funciona com múltiplos processos; use Redis
- **Não implementar reconexão no cliente**: conexões WebSocket caem; o cliente deve reconectar automaticamente
- **Esquecer o WSS**: em produção, sempre use TLS

Para aplicações WebSocket com altíssima concorrência, <a href="https://golang.com.br/blog/" target="_blank" rel="noopener" onclick="umami.track('portfolio-site-click', {source: 'python.dev.br', target: 'golang.com.br', content: 'websocket'})">Go</a> suporta milhões de conexões simultâneas graças às goroutines.

## Termos Relacionados

- [FastAPI](/glossario/fastapi/) - Framework com suporte nativo a WebSocket
- [Async/Await](/glossario/async-await/) - Base da programação com WebSocket em Python
- [API REST](/glossario/api-rest/) - Protocolo complementar ao WebSocket
