🧠FFVAcademy
📡

WebSocket, SSE, streaming: comunicação bidirecional

12 min de leitura·+60 XP

HTTP foi projetado para request-response. Quando você precisa que o servidor envie dados sem o cliente pedir — notificações, chat, streaming de IA — existem três abordagens com trade-offs bem distintos.

Comparação: long polling vs SSE vs WebSocket

TécnicaDireçãoProtocoloOverheadCasos de uso
PollingCliente → ServidorHTTPAlto (um req/s)Evitar sempre
Long pollingBidirecional (simulado)HTTPMédio (1 RTT por msg)Fallback legado
SSEServidor → ClienteHTTPBaixo (stream HTTP)Notificações, LLM streaming
WebSocketBidirecionalWS/WSSMínimo (frames binários)Chat, jogos, colaboração
gRPC streamingBidirecionalHTTP/2Mínimo + schemaMicroserviços bidirecional

SSE: streaming de LLM e notificações

# SSE (Server-Sent Events): stream HTTP com formato especial
# Content-Type: text/event-stream
# Cada evento: "data: conteúdo\n\n" (linha vazia = fim do evento)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import anthropic

app = FastAPI()

# Streaming de LLM com SSE:
@app.get("/stream")
async def stream_llm(prompt: str):
    async def generate():
        client = anthropic.Anthropic()
        with client.messages.stream(
            model="claude-opus-4-6",
            max_tokens=1024,
            messages=[{"role": "user", "content": prompt}]
        ) as stream:
            for text in stream.text_stream:
                # Formato SSE: "data: texto\n\n"
                yield f"data: {text}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # desabilita buffer do Nginx!
        }
    )

# Notificações com SSE e ID para reconexão:
@app.get("/notifications")
async def notifications(user_id: str):
    async def event_stream():
        event_id = 0
        while True:
            # Checar por notificações novas:
            notifications = await get_notifications(user_id)  # busca no DB/Redis
            for notif in notifications:
                event_id += 1
                # SSE com ID permite que o browser retome após reconexão:
                yield f"id: {event_id}\n"
                yield f"event: notification\n"
                yield f"data: {notif.to_json()}\n\n"

            # Heartbeat a cada 30s (evita timeout de proxies):
            yield ": ping\n\n"
            await asyncio.sleep(30)

    return StreamingResponse(event_stream(), media_type="text/event-stream")

# No cliente (JavaScript):
# const es = new EventSource('/notifications?user_id=123');
# es.addEventListener('notification', (e) => {
#   const data = JSON.parse(e.data);
#   showNotification(data);
# });
# es.onerror = () => { /* EventSource reconecta automaticamente! */ };
# // Last-Event-ID header é enviado automaticamente na reconexão

WebSocket: full-duplex bidirecional

# WebSocket: upgrade HTTP → protocolo WS próprio
# Handshake HTTP:
# GET /ws HTTP/1.1
# Upgrade: websocket
# Connection: Upgrade
# Sec-WebSocket-Key: base64(random 16 bytes)
# Sec-WebSocket-Version: 13
#
# Resposta do servidor:
# HTTP/1.1 101 Switching Protocols
# Upgrade: websocket
# Connection: Upgrade
# Sec-WebSocket-Accept: base64(SHA1(key + GUID))
#
# Após isso: frames WS diretamente sobre TCP (sem HTTP)

from fastapi import WebSocket, WebSocketDisconnect
import json

# Servidor WebSocket com FastAPI:
class ConnectionManager:
    def __init__(self):
        self.active: dict[str, list[WebSocket]] = {}  # room_id → [ws]

    async def connect(self, ws: WebSocket, room: str):
        await ws.accept()
        self.active.setdefault(room, []).append(ws)

    def disconnect(self, ws: WebSocket, room: str):
        if room in self.active:
            self.active[room].remove(ws)

    async def broadcast(self, room: str, message: dict):
        for ws in self.active.get(room, []):
            try:
                await ws.send_json(message)
            except Exception:
                pass  # conexão morta, ignora

manager = ConnectionManager()

@app.websocket("/ws/{room}")
async def websocket_endpoint(ws: WebSocket, room: str):
    await manager.connect(ws, room)
    try:
        # Autenticação no primeiro frame:
        auth_frame = await ws.receive_json()
        if auth_frame.get("type") != "auth":
            await ws.close(code=4001, reason="Primeiro frame deve ser auth")
            return

        token = auth_frame.get("token")
        user = verify_jwt(token)  # lança exceção se inválido
        if not user:
            await ws.close(code=4001, reason="Token inválido")
            return

        # Loop principal de mensagens:
        while True:
            data = await ws.receive_json()
            if data.get("type") == "message":
                await manager.broadcast(room, {
                    "type": "message",
                    "user": user.name,
                    "text": data["text"],
                })
    except WebSocketDisconnect:
        manager.disconnect(ws, room)
    except Exception as e:
        manager.disconnect(ws, room)

# Heartbeat: evitar que proxies fechem conexão ociosa
# WS tem ping/pong nativo (opcode 0x9/0xA):
# Nginx: proxy_read_timeout 3600s; (1 hora)
# AWS ALB: idle timeout padrão = 60s (aumente para WS longos)

# No cliente (JavaScript):
# const ws = new WebSocket('wss://api.example.com/ws/room1');
# ws.onopen = () => ws.send(JSON.stringify({type:'auth', token: getJwt()}));
# ws.onmessage = (e) => handleMessage(JSON.parse(e.data));
# ws.onclose = (e) => scheduleReconnect(e.code);  // reconexão manual (diferente de SSE)

Nginx e proxies: configuração para streaming

# Nginx para SSE: desabilitar buffering
nginx_sse_config = """
location /stream {
    proxy_pass http://backend:8000;
    proxy_http_version 1.1;
    proxy_set_header Connection '';        # keep-alive sem upgrade
    proxy_cache off;
    proxy_buffering off;                  # CRÍTICO: sem buffering = streaming real
    proxy_read_timeout 3600s;             # conexão longa para SSE
    add_header X-Accel-Buffering no;      # header para desabilitar buffer via app
}
"""

# Nginx para WebSocket: upgrade de protocolo
nginx_ws_config = """
map $http_upgrade $connection_upgrade {
    default upgrade;
    ''      close;
}

location /ws {
    proxy_pass http://backend:8000;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;        # CRUCIAL: header WS
    proxy_set_header Connection $connection_upgrade;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_read_timeout 3600s;                      # não fechar conexão ociosa
    proxy_send_timeout 3600s;
}
"""

# Problema comum: proxy sem configuração fecha WebSocket com 502/504
# Sintoma: conexão funciona direto no backend mas falha atrás do proxy
# Diagnóstico:
# curl -i -N http://api.example.com/stream   ← SSE (vê se streama ou bufferiza)
# wscat -c wss://api.example.com/ws          ← WebSocket CLI (npm i -g wscat)

# AWS ALB para WebSocket:
# - Suporte nativo: sem configuração especial além do health check
# - Idle timeout padrão: 60s → aumentar para 3600s em aplicações de chat
# - Target group: registre instâncias/containers normalmente
Modelo mental: SSE = HTTP stream simples (servidor → cliente), reconexão automática, funciona atrás de qualquer proxy com `proxy_buffering off`. WebSocket = full-duplex sobre TCP, requer configuração de proxy (Upgrade header). Para LLM streaming: SSE é a escolha certa (unidirecional, simples, CDN-friendly). Para chat e jogos: WebSocket. Heartbeat (ping/pong ou comment SSE) evita que proxies cortem conexões ociosas. Autenticação WebSocket: first frame auth ou token na URL com TTL curto.
💡
Próximo: CORS, CSRF e cookies seguros — segurança web fundamental que todo desenvolvedor backend precisa entender.
🧩

Quiz rápido

3 perguntas · Acerte tudo e ganhe o badge 🎯 Gabarito

Continue lendo