JKorf / Binance.Net

A C# .netstandard client library for the Binance REST and Websocket Spot and Futures API focusing on clear usage and models
https://jkorf.github.io/Binance.Net/
MIT License
1.04k stars 429 forks source link

Issue with duplicate data #79

Closed keefehiggins closed 6 years ago

keefehiggins commented 6 years ago

Hey,

So im using the new SubscribeToKlineStreamAsync passing in an array of strings (Klines). Im running into an issue of where I believe its passing back duplicate data. It seems to get stuck after processing a short time. Im handling so much data its hard to pinpoint at exactly what point it seems to start passing back duplicate data.

By duplicate data i mean it doesnt seem to update even hours after its supposed to. Example a 5 min kline still has the same open/close date it did 4 hours ago.

I know that is not much detail to go off off, but i will try to get better details shortly in order to recreate it easily.

Thanks!

keefehiggins commented 6 years ago

Im diving into this and have found for my project it happens every time. It starts processing fine and then it seems to hang and keep returning the same value(at the least the same dates). I set var socketClient = new BinanceSocketClient(new BinanceSocketClientOptions { LogVerbosity = CryptoExchange.Net.Logging.LogVerbosity.Debug}); but how to i access those logs?

I dont think this would have any effect but I am using the ToKline() method. I will put my full method below.

        private  void SubscribeToStreams(KlineInterval klineInterval, string[] symbols)
        {
            var socketClient = new BinanceSocketClient(new BinanceSocketClientOptions { LogVerbosity = CryptoExchange.Net.Logging.LogVerbosity.Debug});

             LogWriter.LogSocketStatuses($"{DateTime.Now},{klineInterval.ToString()},Subscribing to stream,");
            var successKline = socketClient.SubscribeToKlineStreamAsync(symbols, klineInterval, (data) =>
            {
                try
                {
                    //new LogWriter(data.Data.);
                    _klineEntity.SaveBinanceKline(data.Data.ToKline(), _symbols[data.Symbol], klineInterval);
                }
                catch (Exception ex)
                {
                    new LogWriter(ex.ToString());
                    try { _klineEntity.SaveBinanceKline(data.Data.ToKline(), _symbols[data.Symbol], klineInterval); } catch(Exception ex1) { new LogWriter(ex1.ToString()); }
                }

            });

            successKline.Result.Data.Error += (Exception obj) =>
            {
                LogWriter.LogSocketStatuses($"{DateTime.Now},{klineInterval.ToString()},Error from stream,{obj.ToString().Replace(",","")}");
                SubscribeToStreams(klineInterval, symbols);
            };
            successKline.Result.Data.Closed += () =>
            {
                LogWriter.LogSocketStatuses($"{DateTime.Now},{klineInterval.ToString()},Stream was closed,");

                SubscribeToStreams(klineInterval, symbols);
            };
        }
JKorf commented 6 years ago

Hi, The log defaults to output using Debug.WriteLine, so they appear in your output window in VS but nowhere else. You can change this by setting a file LogWriter like so:

var client = new BinanceSocketClient(new BinanceSocketClientOptions()
{
    LogVerbosity = LogVerbosity.Debug,
    LogWriters = new List<TextWriter>()
    { new ThreadSafeFileWriter(path) }
});

Note that I added the ThreadSafeFileLogger in the latest release (3.1.4) so you might need to update. I'm not sure what is causing your problems, but please share your debug logging when it happens.

keefehiggins commented 6 years ago

Hey,

I have included the log and the sample from the last log item.

  1. I tried using your filewriter but it failed when I called the method multiple times when passing in KlineIntervals. I only passed in the 5 min kline for this test but still got the same results.

  2. Looking at the log the last recorded time was (set in debug log): 2018/05/03 06:27:10:379 and the event time from Binance was 2018 6:08:00.454 PM DebugTime: 06:27:10:379 EventTime: 6:08:00.454 PM OpenTime: 6:05:00 PM CloseTime: 6:09:59.999 PM

  3. This issue happens every time. I can fully reproduce it just by passing the array into the method.

Thanks for all the help on this.

2018/05/03 06:27:10:379 | Debug | Data received on sub of Binance.Net.Objects.BinanceStreamKlineData: {"stream":"brdbtc@kline_5m","data":{"e":"kline","E":1525385280454,"s":"BRDBTC","k":{"t":1525385100000,"T":1525385399999,"s":"BRDBTC","i":"5m","f":1355461,"L":1355476,"o":"0.00008980","c":"0.00008980","h":"0.00009002","l":"0.00008980","v":"1854.00000000","n":16,"x":false,"q":"0.16671172","V":"1025.00000000","Q":"0.09226136","B":"0"}}}

SocketStatusDebugLog.txt

JKorf commented 6 years ago

Hm interesting. As soon as the data starts coming in the time start trailing behind, which makes me think that maybe the handling of the data is taking too long on your end? Can you share the idea/code that processes the data you receive?

JKorf commented 6 years ago

Also, why did that file writer fail? You got an exception?

keefehiggins commented 6 years ago

Sorry for not being detailed on the writer fail. Yes it was an exception. The file is in use by another process. I call the method below in a for loop and pass in a kline on each iteration. The first kline that goes in seems to keep the file open.

I thought it might have also been my work taking too long, but even after running for 10 hours it never seems to make it any further. My db methods are async but I do have the polly framework doing retries on failures.

To be clear the example logging was a quick test I completed but the results are the same for running hours on end.

Currently I pass in all 302 Symbols.

` private void SubscribeToStreams(KlineInterval klineInterval, string[] symbols) { var socketClient = new BinanceSocketClient(new BinanceSocketClientOptions { LogVerbosity = CryptoExchange.Net.Logging.LogVerbosity.Debug});

         LogWriter.LogSocketStatuses($"{DateTime.Now},{klineInterval.ToString()},Subscribing to stream,");
        var successKline = socketClient.SubscribeToKlineStreamAsync(symbols, klineInterval, (data) =>
        {
            try
            {
                _klineEntity.SaveBinanceKline(data.Data.ToKline(), _symbols[data.Symbol], klineInterval);
            }
            catch (Exception ex)
            {
                new LogWriter(ex.ToString());
                try { _klineEntity.SaveBinanceKline(data.Data.ToKline(), _symbols[data.Symbol], klineInterval); } catch(Exception ex1) { new LogWriter(ex1.ToString()); }
            }

        });

        successKline.Result.Data.Error += (Exception obj) =>
        {
            LogWriter.LogSocketStatuses($"{DateTime.Now},{klineInterval.ToString()},Error from stream,{obj.ToString().Replace(",","")}");
            SubscribeToStreams(klineInterval, symbols);
        };
        successKline.Result.Data.Closed += () =>
        {
            LogWriter.LogSocketStatuses($"{DateTime.Now},{klineInterval.ToString()},Stream was closed,");

            SubscribeToStreams(klineInterval, symbols);
        };
    }` 
JKorf commented 6 years ago

Can you add a timer inside the event handler and check how long the event handler execution takes? From the logging expect somewhere around 100-150 ms, which is too slow to handle the data, but I might be wrong.

keefehiggins commented 6 years ago

So are you wanting to know the time it basically takes to run my save methods? And do you mean if it takes more that 150ms to process the data saving parts it would cause this issue?

JKorf commented 6 years ago

Yup, can you add this?

var successKline = socketClient.SubscribeToKlineStreamAsync(symbols, klineInterval, (data) =>
        {
            var startTime = DateTime.UtcNow;
            try
            {
                _klineEntity.SaveBinanceKline(data.Data.ToKline(), _symbols[data.Symbol], klineInterval);
            }
            catch (Exception ex)
            {
                new LogWriter(ex.ToString());
                try { _klineEntity.SaveBinanceKline(data.Data.ToKline(), _symbols[data.Symbol], klineInterval); } catch(Exception ex1) { new LogWriter(ex1.ToString()); }
            }
            Debug.WriteLine((DateTime.UtcNow - startTime).TotalMiliseconds + "ms");
        });

That would output the time your save took every time it gets run, can you tell me what the average is?

keefehiggins commented 6 years ago

I think your right on. Minimum was 200 ms with some going all the way out to 1 second. Average was about 500 ms.

Do you have any suggestions on my end that might help out?

keefehiggins commented 6 years ago

I ended up solving my issue and just wanted to post a general idea around it in case someone else needs it. I tried several methods and found the below one to be the most effective and the fastest.

I took the response data from the websockets and stuck it into a queuing system. The queuing system constantly (loops) processes the data. Each iteration of the processing loop grabs a local copy of whats in the queue and de queues all the data it copied from the original queue. It groups the items by the open date and inserts each of the latest items in the queue. This makes sure that if the processing gets behind it will still update the previous kline time with the latest version and then insert the latest kline in the next time frame.

This has given me the ability to have near real time data with a delay of between 15 to 45 seconds for 6 types of Klines with 302 symbols for each one.

Thanks @JKorf for helping me figure all this stuff out. Much appreciated. I now plan on implementing bittrex and bitfinex :)

JKorf commented 6 years ago

Glad you fixed the issue :)