sonvister / Binance

A .NET Standard Binance API library.
MIT License
229 stars 77 forks source link

Problems with multiple streams and Aggregate trade socket #47

Closed rakewell closed 6 years ago

rakewell commented 6 years ago

Binance-0.2.0-alpha24

Here is the code that I have used for creating two test streams:

Dim TradeSocketLoggerFactory As ILoggerFactory = New Microsoft.Extensions.Logging.LoggerFactory()
        Dim TradeSocketStreamClientLogger As New Logger(Of DefaultWebSocketClient)(TradeSocketLoggerFactory)
        Dim TradeSocketStreamLogger As New Logger(Of BinanceWebSocketStream)(TradeSocketLoggerFactory) '
        Dim AggregateTradeSocketLogger As New Logger(Of AggregateTradeWebSocketClient)(TradeSocketLoggerFactory)

        TradeSocketLoggerFactory.AddFile("logs/Binance - TRADE SOCKET - ApiLogger." & Date.Now.ToSortableDate & ".txt", LogLevel.Information)

        m_BinanceTradeSocketStreamClient = New DefaultWebSocketClient(TradeSocketStreamClientLogger)
        m_BinanceTradeSocketStream = New BinanceWebSocketStream(m_BinanceTradeSocketStreamClient, TradeSocketStreamLogger)
        m_BinanceAggregateTradeSocket = New AggregateTradeWebSocketClient(m_BinanceTradeSocketStream, AggregateTradeSocketLogger)

        Me.m_BinanceAggregateTradeSocketCancel = New CancellationTokenSource

        Me.m_BinanceAggregateTradeSocket.SubscribeAndStreamAsync("BTCUSDT", m_BinanceAggregateTradeSocketCancel.Token)
        Me.m_BinanceAggregateTradeSocket.SubscribeAndStreamAsync("POEBTC", m_BinanceAggregateTradeSocketCancel.Token)

When I register one stream, say BTCUSDT, then I am able to receive events on the AggregateTrade event:

Private Sub m_BinanceAggregateTradeSocket_AggregateTrade(sender As Object, e As AggregateTradeEventArgs) Handles m_BinanceAggregateTradeSocket.AggregateTrade

However, when I register two streams, as in the snippet above, the event stop firing. I am still able to receive events on the message event handler for the trade socket stream:

Private Sub m_BinanceSocketTradeStreamClient_Message(sender As Object, e As WebSocketClientEventArgs) Handles m_BinanceTradeSocketStreamClient.Message

The message shows data for both streams. Here is the event log for the above test:

[info] 3/02/2018 9:16:35 PM +11:00 AggregateTradeWebSocketClient.Subscribe: "BTCUSDT" (callback: no). [thread: 1] [info] 3/02/2018 9:16:35 PM +11:00 BinanceWebSocketStream.Subscribe: Adding stream ("btcusdt@aggTrade"). [thread: 1] [info] 3/02/2018 9:16:35 PM +11:00 BinanceWebSocketStream.Subscribe: Adding callback for stream ("btcusdt@aggTrade"). [thread: 1] [info] 3/02/2018 9:16:35 PM +11:00 BinanceWebSocketStream.StreamAsync: "wss://stream.binance.com:9443/ws/btcusdt@aggTrade" [thread: 1] [info] 3/02/2018 9:16:36 PM +11:00 AggregateTradeWebSocketClient.Subscribe: "POEBTC" (callback: no). [thread: 1] [warn] 3/02/2018 9:16:36 PM +11:00 BinanceWebSocketStream.Subscribe: IWebSocketClient is already streaming. [thread: 1] [info] 3/02/2018 9:16:36 PM +11:00 BinanceWebSocketStream.Subscribe: Adding stream ("poebtc@aggTrade"). [thread: 1] [info] 3/02/2018 9:16:36 PM +11:00 BinanceWebSocketStream.Subscribe: Adding callback for stream ("poebtc@aggTrade"). [thread: 1] [fail] 3/02/2018 9:16:39 PM +11:00 BinanceWebSocketStream: Unhandled StreamAsync exception. [thread: 6] (exception: Value cannot be null. Parameter name: value) [fail] 3/02/2018 9:16:40 PM +11:00 BinanceWebSocketStream: Unhandled StreamAsync exception. [thread: 6] (exception: Value cannot be null. Parameter name: value) [fail] 3/02/2018 9:16:40 PM +11:00 BinanceWebSocketStream: Unhandled StreamAsync exception. [thread: 5] (exception: Value cannot be null. Parameter name: value) [fail] 3/02/2018 9:16:40 PM +11:00 BinanceWebSocketStream: Unhandled StreamAsync exception. [thread: 5] (exception: Value cannot be null. Parameter name: value) [fail] 3/02/2018 9:16:42 PM +11:00 BinanceWebSocketStream: Unhandled StreamAsync exception. [thread: 10] (exception: Value cannot be null. Parameter name: value) [fail] 3/02/2018 9:16:43 PM +11:00 BinanceWebSocketStream: Unhandled StreamAsync exception. [thread: 6] (exception: Value cannot be null. Parameter name: value) [fail] 3/02/2018 9:16:43 PM +11:00 BinanceWebSocketStream: Unhandled StreamAsync exception. [thread: 5] (exception: Value cannot be null. Parameter name: value) [fail] 3/02/2018 9:16:46 PM +11:00 BinanceWebSocketStream: Unhandled StreamAsync exception. [thread: 6] (exception: Value cannot be null. Parameter name: value) [fail] 3/02/2018 9:16:46 PM +11:00 BinanceWebSocketStream: Unhandled StreamAsync exception. [thread: 6] (exception: Value cannot be null. Parameter name: value) [fail] 3/02/2018 9:16:46 PM +11:00 BinanceWebSocketStream: Unhandled StreamAsync exception. [thread: 6] (exception: Value cannot be null.

sonvister commented 6 years ago

@rakewell, there are a couple of issues here...

When you use the same AggregateTradeWebSocketClient to subscribe to multiple symbols you are combining streams and sharing a single IWebSocketClient, which should only be streamed once. Had there not been another issue you would have seen an InvalidOperationException on the second stream operation. The intended use is to subscribe then stream. The SubscribeAndStream methods (which I renamed to prevent just such misuse ...unsuccessfully) is only there as a convenience when subscribing to a single stream (documentation pending).

The other issue (causing the exception) is that there was no thread synchronization to support subscribing while streaming (untested and undecided behavior). Unfortunately, having mentioned it should be supported, I reluctantly gave this bug status. I am yet uncertain if I want to add the performance penalty (locking) to support that backwards use of the API and further explain that you have to restart the stream to make it work.... BTW, your current use case will never be supported, in fact you have me nearly convinced I should remove SubscribeAndStream altogether.

rakewell commented 6 years ago

thanks @sonvister I believe I understand now. I was under the impression that you can subscribe and unsubscribe ad hoc at any time (even whist streaming) with Binance but this seems not the case. So the practice should be to first subscribe to all the symbols your require using the .Subscribe method then use the .StreamAsync method to start streaming data for all the subscribed symbols.

I have modified the code to this and it is working well. For now, adding or removing streams will require the cancellation of the stream and re-subscription of all the required streams. It would be nice to be able to add and remove subscriptions while the stream is running but perhaps this can be done in the future if it is required by more users.

I would remove the SubscribeAndStream as it would be more common to subscribe to multiple streams (i would imagine) and perhaps place validation in the StreamAsync method to throw an exception if there a no subscriptions registered. This would make it clear that the user must subscribe first and stream next.

sonvister commented 6 years ago

@rakewell that is the correct understanding. Because there is no capability provided by Binance to add/remove individual streams from an active combined stream, there is nothing I can do.