Skip to main content

Arbitragem em Tempo Real via WebSocket

O fluxo WebSocket é completamente independente do fluxo REST existente. Ambos rodam em paralelo — o WebSocket adiciona latência ultrabaixa sem afetar a estabilidade do processamento REST de 19 segundos.

🎯 Visão Geral

O sistema possui dois fluxos de integração complementares:
CaracterísticaFluxo REST (existente)Fluxo WebSocket (novo)
FrequênciaA cada 19 segundosContínuo (event-driven)
Latência~10–19 s< 200 ms
Exchanges17 (polling)12 (fase 1–3) + 5 (REST-only)
Entrega ao frontendHTTP pollingServer-Sent Events (SSE)
PersistênciaMongoDB (operacoes)MongoDB (operations_realtime)
AtivaçãoSempre ativoENABLED_WEBSOCKET=1

🏗️ Arquitetura dos Componentes

Componentes Principais

WebSocketExchangeHandler

Interface contratual para cada exchange. Implementa Connect(), Name() e HealthCheck(). Cada exchange tem sua própria implementação.

RealtimePriceCache

Cache thread-safe em memória. Preserva volume REST quando o tick WebSocket não o fornece. Fornece GetAll() no mesmo formato do fluxo REST.

RealtimeArbitrageEngine

Motor de detecção. Debounce de 200ms por símbolo evita processamento excessivo. Reutiliza CompareBaseQuotePrices() do fluxo REST sem duplicação de código.

SSEBroadcaster

Fan-out de Server-Sent Events para todos os browsers conectados. Cada cliente recebe um canal dedicado com backpressure automático.

WebSocketManager

Singleton orquestrador. Gerencia reconexão automática com backoff exponencial (1s → 2s → 4s → 8s → 16s máx).

🔄 Fluxo de Dados Detalhado

1. Conexão e Seed de Volume

Ao conectar, o handler WebSocket da Binance:
  1. Chama a REST API pública para buscar snapshot de volume (24h) de todos os pares
  2. Alimenta o RealtimePriceCache com os dados iniciais
  3. Abre conexão WebSocket: wss://stream.binance.com:9443/ws
  4. Subscreve o stream !bookTicker (todos os pares, bid/ask em tempo real)
  5. Inicia timer de 60s para refresh periódico do volume via REST
// Subscription message sent to Binance
{
  "method": "SUBSCRIBE",
  "params": ["!bookTicker"],
  "id": 1
}

2. Processamento de Ticks

3. Reconexão Automática

Falha na conexão

Aguarda 1s → Tenta reconectar
    ↓ (falha)
Aguarda 2s → Tenta reconectar
    ↓ (falha)
Aguarda 4s → Tenta reconectar
    ↓ (falha)
Aguarda 8s → Tenta reconectar
    ↓ (falha)
Aguarda 16s → Tenta reconectar (máximo)
    ↓ (sucesso)
Reset do backoff → Monitora continuamente

⚡ Ativação

O fluxo WebSocket é controlado por variável de ambiente para garantir zero impacto em produção enquanto não estiver pronto:
# Habilitar o fluxo WebSocket
ENABLED_WEBSOCKET=1

# ou
ENABLED_WEBSOCKET=true
Se ENABLED_WEBSOCKET não estiver definido ou for qualquer outro valor, o fluxo WebSocket não é iniciado e nenhum recurso adicional é consumido.
// cmd/main.go — inicialização condicional
func setupWebSocket() {
    enabledWS := os.Getenv("ENABLED_WEBSOCKET")
    if enabledWS == "1" || enabledWS == "true" {
        go application.StartWebSocketFlow()
    }
}

📡 Endpoints SSE

Três endpoints HTTP expõem os dados do fluxo WebSocket:
EndpointTipoDescrição
GET /v1/realtime/statusJSONStatus de cada handler WebSocket
GET /v1/realtime/arbitrageSSE streamOportunidades de arbitragem em tempo real
GET /v1/realtime/pricesSSE streamSnapshot completo de preços a cada 1s

Consumo no Frontend (EventSource)

// Conectar ao stream de arbitragem
const source = new EventSource('/v1/realtime/arbitrage');

source.onmessage = (event) => {
    const opportunity = JSON.parse(event.data);
    console.log(opportunity);
    // {
    //   "ticker": "BTC-USDT",
    //   "buyExchange": "Binance",
    //   "sellExchange": "OKX",
    //   "buyPrice": 43250.50,
    //   "sellPrice": 43380.00,
    //   "profitPercentAskBid": 0.29,
    //   "timestamp": "2025-03-18T12:00:00Z"
    // }
};

source.onerror = () => {
    // EventSource reconecta automaticamente
};

// Limpar ao desmontar componente
source.close();
A API EventSource do browser reconecta automaticamente ao servidor em caso de desconexão, sem necessidade de lógica adicional no frontend.

🗄️ Persistência

Oportunidades detectadas pelo motor WebSocket são salvas na coleção operations_realtime do MongoDB:
{
  "_id": "ObjectId(...)",
  "ticker": "ETH-USDT",
  "buyExchange": "Binance",
  "sellExchange": "Bybit",
  "buyPrice": 2250.10,
  "sellPrice": 2256.80,
  "profitPercentAskBid": 0.30,
  "source": "websocket",
  "createdAt": "2025-03-18T12:00:00.000Z"
}
A coleção operations_realtime é projetada para ter TTL curto — os dados são efêmeros por natureza, servindo principalmente como auditoria e buffer para clientes que se reconectam.

🔌 Exchanges Suportadas

Fase 1 — Concluída ✅

ExchangeStreamStatus
Binancewss://stream.binance.com:9443/ws!bookTicker✅ Implementado

Fase 2 — Planejada 🔄

ExchangeStream
OKXwss://ws.okx.com:8443/ws/v5/publictickers
Bybitwss://stream.bybit.com/v5/public/spottickers
KuCoinwss://ws-api.kucoin.commarket/ticker:all
Gate.iowss://api.gateio.ws/ws/v4/spot.tickers
HTXwss://api.huobi.pro/wsmarket.$symbol.bbo

Fase 3 — Planejada 🔄

ExchangeStream
Bitgetwss://ws.bitget.com/v2/ws/publictickers
Krakenwss://ws.kraken.comticker
Bitfinexwss://api-pub.bitfinex.com/ws/2ticker
BingXwss://open-api-ws.bingx.com/markettickers
MEXCwss://wbs.mexc.com/wsspot@public.bookTicker.v3.api
Crypto.comwss://stream.crypto.com/exchange/v1/marketticker

Fase 4 — REST-only (sem WebSocket público)

ExchangeMotivo
Mercado BitcoinSem WebSocket público documentado
FoxbitAPI privada apenas
NovadaxREST polling suficiente
BitsoWebSocket requer autenticação

🧪 Interface do Handler

Para adicionar suporte a uma nova exchange, implemente a interface WebSocketExchangeHandler:
package integrations

type WebSocketExchangeHandler interface {
    // Name retorna o nome da exchange (deve ser idêntico ao nome REST)
    Name() string

    // Connect inicia a conexão WebSocket e chama onUpdate para cada tick.
    // Deve bloquear até o contexto ser cancelado ou ocorrer erro fatal.
    Connect(ctx context.Context, onUpdate PriceUpdateCallback) error

    // HealthCheck retorna true se a conexão estiver ativa
    HealthCheck() bool
}

// PriceUpdateCallback é chamado para cada tick recebido
type PriceUpdateCallback func(exchange, symbol string, entry RealtimePriceEntry)

type RealtimePriceEntry struct {
    Ask       float64
    Bid       float64
    Volume    float64   // 0 = preservar último valor no cache
    AskQty    float64
    BidQty    float64
    UpdatedAt time.Time
}
Registre o novo handler no WebSocketManager:
// internal/application/ws-manager.go
func (m *WebSocketManager) registerHandlers() {
    m.handlers = []integrations.WebSocketExchangeHandler{
        integrations.NewBinanceWSHandler(),
        integrations.NewOKXWSHandler(),    // adicionar aqui
        integrations.NewBybitWSHandler(),   // adicionar aqui
    }
}

📊 Monitoramento

Use o endpoint /v1/realtime/status para verificar a saúde de cada handler:
{
  "enabled": true,
  "handlers": [
    {
      "exchange": "Binance",
      "connected": true,
      "lastUpdate": "2025-03-18T12:00:00Z"
    }
  ],
  "totalConnected": 1,
  "cacheSize": 847
}
  • Fase 1 concluída: Binance implementado e testado
  • 🔄 Fase 2: Implementar OKX, Bybit, KuCoin, Gate.io, HTX
  • 🔄 Fase 3: Bitget, Kraken, Bitfinex, BingX, MEXC, Crypto.com
  • 🔄 Fase 4: Avaliar alternativas para exchanges REST-only