BitMEX / api-connectors

Libraries for connecting to the BitMEX API.
https://www.bitmex.com/app/restAPI
910 stars 795 forks source link

Repeating events for all symbols #120

Closed emileindik closed 6 years ago

emileindik commented 6 years ago

Hi there,

Thanks for this package! Is there a reason that when a single piece of data comes in from a particular symbol stream, events for all symbols are emitted?

I think it's due to this block of code in createSocket.js:

Object.keys(symbolData).forEach((symbol) => {
  const key = `${table}:${action}:${symbol}`;
  debug('emitting %s with data %j', key, symbolData[symbol]);
  emitter.emit(key, _.extend({}, data, {data: symbolData[symbol]}));
});

Can we not detect exactly which symbol the data belongs to? Could this be causing my code to think that a symbol is receiving more activity than it actually is? Sorry if I'm missing something.

Thanks!

ryanfox commented 6 years ago

Consumers may be listening for different event types, see also in createSocket.js:

https://github.com/BitMEX/api-connectors/blob/115f15971785595e69b9b21437efc0319ab8e029/official-ws/nodejs/lib/createSocket.js#L50-L56

Your consumer should only be receiving events you are subscribed to. If you're getting more events than you anticipated, double check exactly what is subscribed.

emileindik commented 6 years ago

Right, I am subscribed to events from, say, XBTUSD, XBTM18, and XBTU18. However, if you simply console.log(symbol) in the callback to client.addStream() stdout will show:

XBTUSD
XBTM18
XBTU18
XBTUSD
XBTM18
XBTU18
.
.
.

An event for each symbol is emitted on any received data due to this block in createSocket.js:

Object.keys(symbolData).forEach((symbol) => {
  const key = `${table}:${action}:${symbol}`;
  debug('emitting %s with data %j', key, symbolData[symbol]);
  emitter.emit(key, _.extend({}, data, {data: symbolData[symbol]}));
});

I don't think there should be a Object.keys() loop at all, but simply adding an if condition:

Object.keys(symbolData).forEach((symbol) => {
  if (symbolData[symbol].length) {
    const key = `${table}:${action}:${symbol}`;
    debug('emitting %s with data %j', key, symbolData[symbol]);
    emitter.emit(key, _.extend({}, data, {data: symbolData[symbol]}));
  }
});

seems to work as a quick fix. Would just like to be sure that I'm not breaking any intended functionality.

ryanfox commented 6 years ago

How are you subscribing to the specific events?

emileindik commented 6 years ago

The same way the delta-server subscribes.

config.symbols.forEach(function(symbol) {
  config.streams.forEach(function(streamName) {
    client.addStream(symbol, streamName, function(data, symbol, tableName) {

    });
  });
});

with a config that looks like:

module.exports = {
  symbols: ['XBTUSD', 'XBTM18', 'XBTU18'],
  streams: ['orderBookL2'],
  .
  .
  .
}

Should I instead be sending a subscribe message like this?

{"op": "subscribe", "args": ['orderBookL2:XBTUSD', 'orderBookL2:XBTM18', 'orderBookL2:XBTU18']}
ryanfox commented 6 years ago

I think that may be a result of the way orderBookL2 data tends to arrive. If you change streams to quote, do you still see double logging?

emileindik commented 6 years ago

Yes, I still see this pattern:

XBTUSD
XBTM18
XBTU18
XBTUSD
XBTM18
XBTU18
.
.
.

for this config with just quote:

module.exports = {
  symbols: ['XBTUSD', 'XBTM18', 'XBTU18'],
  streams: ['quote'],
}
andersea commented 6 years ago

I haven't looked at the code yet, but from the behaviour I am seeing it really looks like there is an issue. I am getting multiple output messages for each input message when subscribing to multiple instruments at once. It is pretty hard to make sense of without going through the code step by step, but it almost seems that it dumps the entire table when each input message is recieved, which is not what you want. You only want the data that matches the table key, so usually the symbol name.

I have been using my own connector for python lately, so I haven't been looking over the code in this repository for a while.

andersea commented 6 years ago

Yeah.. emitSplitData is broken. Every event on each table causes all listeners on that table to get a copy of the data they are listening for. This scales exponentially with number of symbols being listened for!

Adding the if section like @emileindik suggest will partially mend it, but we still have issue #132. And I am not sure it is the best fix.

What happens is as follows:

Each stream subscription adds an event handler for table:*:symbol. This happens in addStreamHelper() in index.js. This event handler serves as a proxy for the callback. The event handler first calls deltaParser, which updates the store with the recieved data and returns the full updated record(s) back.

The updated data is then sliced if it is larger than the client max storage length, to reduce the size of the dataset. (This is bad design btw. It should be the stores responsibility to handle its own size.)

This data is then sent back to the callback.

Next up, is what happens when a message is recieved? What triggers the event handler?

emitSplitData.

Every time a message for a particular table is recieved and it is a symbol table, emitSplitData is called to handle the message.

Here is where the error starts:

// By looking at what we're subscribed to, we can save time by only emitting those events.
const matchingStreams = emitter._listenerTree[table];
const symbolData = _.mapValues(matchingStreams, () => []);

emitter._listenerTree[table] is a dictionary that counts each time someone subscribes to a symbol. This is so that when someone removes a listener and it is the last listener on a symbol, we can be nice and unsubscribe from the stream. (This is also bad design. The event emitter itself is already master for who is subscribed. You can inspect the event emitter data structure to retrieve this information. Adding your own system on top of this just means more stuff that can go wrong. (Will it handle people listening on "*" correctly?))

The quoted code will create a new dictionary with one entry per symbol.

Why? This is a problem right!? People could be listening to 'XBTUSD', 'BCHM18', etc. The message is only meant for people who are listening to the particular key, so why are we even considering other symbols? We should only consider the actual symbol (and "*").

The next bit of code partially remedies this:

// This is similar to _groupBy, but faster.
for (let i = 0; i < data.data.length; i++) {
    const d = data.data[i];
    if (symbolData[d[filterKey]]) symbolData[d[filterKey]].push(d);
}

This will only add data to the symbols that match.

But we still emit for all symbols here:

Object.keys(symbolData).forEach((symbol) => {
    const key = `${table}:${action}:${symbol}`;
    debug('emitting %s with data %j', key, symbolData[symbol]);
    emitter.emit(key, _.extend({}, data, {data: symbolData[symbol]}));
});

This means the event handler proxy will be called for all symbols. The proxy calls deltaParser with an empty dataset (it got filtered in the groupBy code above). It doesn't break the data store, but deltaParser thinks you just want to retrieve the data, so it returns it and the callback is then called with the unmodified data.

And so now we have the issue..

ryanfox commented 6 years ago

Thanks all for investigating this. Should be fixed on master now.

The issue with "*" subscriptions still stands - follow #132 for information on that issue.