talostrading / sonic

Sonic is a Go library for network and I/O programming that provides developers with a consistent asynchronous model, with a focus on achieving the lowest possible latency and jitter in Go.
MIT License
676 stars 16 forks source link

Multiple websocket streams & usage of sync.WaitGroup #129

Closed ultrasaw closed 5 months ago

ultrasaw commented 5 months ago

First of all, thank you so much for open-sourcing the sonic package! It made me rethink my current approach to websocket stream subscribing as I currently do everything in a dedicated goroutine per asset, e.g.:

package main

import (
    "gobang/config"
    "gobang/scylla"
    "gobang/ws"
    "log"
)

func main() {
    // Initialize ScyllaDB session
    session, err := scylla.InitializeScyllaDB()
    if err != nil {
        log.Fatalf("failed to initialize ScyllaDB session: %v", err)
    }
    defer session.Close()

    // Initialize a single WebSocketManager instance
    wsManager := ws.NewWebSocketManager()

    for _, symbol := range config.Symbols {
        go wsManager.SubscribeAndHandle(symbol, session)
    }
    log.Println("WebSocket managers initialized. Press Ctrl+C to exit.")
    select {}
}

/////

package ws

import (
    "context"
    "time"

    "github.com/gocql/gocql"
)

func (manager *WebSocketManager) SubscribeAndHandle(symbol string, session *gocql.Session) {
    ctx, cancel := context.WithCancel(context.Background())
    manager.mu.Lock()
    manager.StreamContexts[symbol] = cancel
    manager.StartTime[symbol] = time.Now()
    manager.mu.Unlock()

    go manager.HandleDepthStream(ctx, symbol, session)
    go manager.HandleTradeStream(ctx, symbol, session)
}

Above is the high-level overview of how I currently handle websocket connections. However after stumbling onto this package, I realized that handling all streams of interest, e.g. order book, trades etc., could be better implemented using sonic, hence my question regarding the binance example:

if I were to subscribe to multiple different streams of different servers, how would I go on about doing that? Does the code below correctly implement the idea using sonic?

package main

import (
    "crypto/tls"
    "fmt"
    "sync"

    "github.com/talostrading/sonic"
    "github.com/talostrading/sonic/codec/websocket"
)

var subscriptionMessage = []byte(
    `
{
  "id": 1,
  "method": "SUBSCRIBE",
  "params": [ "btcusdt@depth", "btcusdt@aggTrade" ]
}
`)

var b = make([]byte, 512*1024) // contains websocket payloads

func run(wg *sync.WaitGroup, url string, stream websocket.Stream) {
    defer wg.Done()
    stream.AsyncHandshake(url, func(err error) {
        onHandshake(err, stream)
    })
}

func onHandshake(err error, stream websocket.Stream) {
    if err != nil {
        panic(err)
    } else {
        stream.AsyncWrite(subscriptionMessage, websocket.TypeText, func(err error) {
            onWrite(err, stream)
        })
    }
}

func onWrite(err error, stream websocket.Stream) {
    if err != nil {
        panic(err)
    } else {
        readLoop(stream)
    }
}

func readLoop(stream websocket.Stream) {
    var onRead websocket.AsyncMessageHandler
    onRead = func(err error, n int, _ websocket.MessageType) {
        if err != nil {
            panic(err)
        } else {
            b = b[:n]

            // Print the incoming message
            fmt.Println(string(b[:50]))

            b = b[:cap(b)]

            stream.AsyncNextMessage(b, onRead)
        }
    }
    stream.AsyncNextMessage(b, onRead)
}

func createWebSocketStream(ioc sonic.IO, url string, wg *sync.WaitGroup) {
    stream, err := websocket.NewWebsocketStream(&ioc, &tls.Config{}, websocket.RoleClient)
    if err != nil {
        panic(err)
    }
    wg.Add(1)
    go run(wg, url, stream)
}

func main() {
    ioc := sonic.MustIO()
    defer ioc.Close()

    var wg sync.WaitGroup

    // Define multiple WebSocket URLs
    urls := []string{
        "wss://fstream.binance.com/stream?streams=",
        "wss://fstream.binance.com/stream?streams=", // For simplicity, using the same URL twice
    }

    for _, url := range urls {
        createWebSocketStream(*ioc, url, &wg)
    }

    // Run the IO context to handle events
    ioc.Run()

    // Wait for all WebSocket connections to complete
    wg.Wait()
}

Another question I have is how to properly use this package together with additional logic, e.g. in the binance example one would get a snapshot of the order book while buffering the update events. Would using a sync.WaitGroup be considered correct in the following scenario?

package main

import (
    "crypto/tls"
    "encoding/json"
    "fmt"
    "net/http"
    "sync"

    "github.com/talostrading/sonic"
    "github.com/talostrading/sonic/codec/websocket"
)

var subscriptionMessage = []byte(
    `
{
  "id": 1,
  "method": "SUBSCRIBE",
  "params": [ "btcusdt@depth" ]
}
`)

var b = make([]byte, 512*1024) // contains websocket payloads
var eventBuffer = []DepthUpdate{}
var mu sync.Mutex
var wg sync.WaitGroup // is this a sensible approach?

type DepthUpdate struct {
    EventTime        interface{} `json:"E"`
    TransactionTime  int64       `json:"T"`
    Symbol           string      `json:"s"`
    FirstUpdateId    int64       `json:"U"`
    LastUpdateId     int64       `json:"u"`
    PrevLastUpdateId int64       `json:"pu"`
    Bids             [][2]string `json:"b"`
    Asks             [][2]string `json:"a"`
}

type DepthSnapshot struct {
    LastUpdateId int64       `json:"lastUpdateId"`
    Bids         [][2]string `json:"bids"`
    Asks         [][2]string `json:"asks"`
}

func run(stream websocket.Stream) {
    stream.AsyncHandshake("wss://fstream.binance.com/stream?streams=", func(err error) {
        onHandshake(err, stream)
    })
}

func onHandshake(err error, stream websocket.Stream) {
    if err != nil {
        panic(err)
    } else {
        stream.AsyncWrite(subscriptionMessage, websocket.TypeText, func(err error) {
            onWrite(err, stream)
        })
    }
}

func onWrite(err error, stream websocket.Stream) {
    if err != nil {
        panic(err)
    } else {
        readLoop(stream)
    }
}

func readLoop(stream websocket.Stream) {
    var onRead websocket.AsyncMessageHandler
    onRead = func(err error, n int, _ websocket.MessageType) {
        if err != nil {
            panic(err)
        } else {
            b = b[:n]
            var event DepthUpdate
            if err := json.Unmarshal(b, &event); err != nil {
                fmt.Println("Failed to unmarshal:", err)
                return
            }

            mu.Lock()
            eventBuffer = append(eventBuffer, event)
            mu.Unlock()

            fmt.Println(string(b[:50]))
            b = b[:cap(b)]

            stream.AsyncNextMessage(b, onRead)
        }
    }
    stream.AsyncNextMessage(b, onRead)
}

func getDepthSnapshot() DepthSnapshot {
    fmt.Println("getting a snapshot for btcusdt")

    resp, err := http.Get("https://fapi.binance.com/fapi/v1/depth?symbol=BTCUSDT&limit=1000")
    if err != nil {
        panic(err)
    }
    defer resp.Body.Close()

    var snapshot DepthSnapshot
    if err := json.NewDecoder(resp.Body).Decode(&snapshot); err != nil {
        panic(err)
    }

    return snapshot
}

func processBufferedEvents(snapshot DepthSnapshot) {
    mu.Lock()
    defer mu.Unlock()

    var validEvents []DepthUpdate
    for _, event := range eventBuffer {
        if event.FirstUpdateId >= snapshot.LastUpdateId {
            validEvents = append(validEvents, event)
        }
    }

    for _, event := range validEvents {
        if event.PrevLastUpdateId != snapshot.LastUpdateId {
            fmt.Println("Detected gap in events, re-initializing...")
            snapshot = getDepthSnapshot()
            validEvents = validEvents[:0] // Clear valid events
            mu.Lock()
            eventBuffer = eventBuffer[:0] // Clear event buffer
            mu.Unlock()
            processBufferedEvents(snapshot)
            return
        }
        snapshot.LastUpdateId = event.LastUpdateId
    }

    fmt.Println("Synchronized with snapshot:", snapshot)
}

func main() {
    ioc := sonic.MustIO()
    defer ioc.Close()

    stream, err := websocket.NewWebsocketStream(ioc, &tls.Config{}, websocket.RoleClient)
    if err != nil {
        panic(err)
    }

    wg.Add(1)
    go func() {
        defer wg.Done()
        run(stream)
        ioc.Run()
    }()

    snapshot := getDepthSnapshot()
    processBufferedEvents(snapshot)

    wg.Wait()
}

Thank you so much in advance!

PS I'm not expecting exact code to be provided back to me, i.e. I want to use this moment to get to know golang and the sonic package better, would also greatly appreciate any hints / recommendations on how to get comfortable with this repo so that I am able to contribute in future 😇 Recommendations on what to try and build myself from the ground up are most welcome.

dgrr commented 5 months ago

Hey, it's been long time since I used this library, but you don't need to use a WaitGroup. If correctly implemented, the IOC is supposed to wait for all async events. Also, you don't need to run the IOC in another goroutine, I guess you could (if correctly implemented by the author(s)). Ideally you run your background processes in another goroutine, not the other way around. About the readLoop, I wouldn't do it that way, I'd rather you could define the callback as a function instead of as a lambda function.

sergiu128 commented 5 months ago

To add to the above, you can handle multiple websocket streams in the same goroutine, no need to go run(wg, url, stream). There's an example in the README, you can apply it to websocket streams as well:

image

You can create n streams in a for loop one-by-one, setting up the read loop (by lambda is fine as well, it's a style thing) properly for each one correctly, without spawning any goroutine. You don't even need to append each connection to a slice. In the end, just use for { ioc.Poll() } or ioc.Run() to run all the streams.

ultrasaw commented 5 months ago

@dgrr @sergiu128 thank you so much for the responses. Indeed while waiting on the comments here & extending on the example, I found out that wait groups are not necessary, neat;

@dgrr out of curiosity, since you said you haven't used the sonic package in a long time, did you switch to your own implementation for low latency websocket communication?

Have a splendid remainder of the week!