deepstreamIO / deepstream.io

deepstream.io server
https://deepstreamio.github.io
MIT License
7.14k stars 381 forks source link

Feature Suggestion: Wrap Redis Streams #1061

Closed redlock closed 3 years ago

redlock commented 4 years ago

The streams concept introduced in Kafka and is now implemented in Redis is a very handy feature and is sorely missing in Deepstream.

For example, we have thousands of trades happening every second and we deliver these trades to clients who request them. We can't do that with Deepstream because we don't have a history of the trades. Ideally a client will request the past 5 minutes history of trades and then get realtime updates.

With records it cannot be done because of both the size of the stream (thousands of trades) and the fact that it is a growing and sorted data structure. Events also can't e used because they don't save or remember past updates.

Since Deepstream doesn't persist, I wonder if Redis streams can be wrapped into Deepstream api. Redis is always behind some app server and so we already are wrapping the stream in a node server for websocket access to the stream. It would be great if Deepstream can integrate Redis streams into its API.

Thanks.

valentinvichnal commented 4 years ago

Interesting concept but is there any reason why you can't store these trades?

You could fork the storage mongodb connector and replace mongodb with a time series db: https://www.influxdata.com/ https://www.aerospike.com/ https://www.scylladb.com/

You can also implement a custom retrieval logic, for example request the past 5, 15, 30 minutes trades.

redlock commented 4 years ago

The problem isn't storage but rather sync. If for example I request the past 5 minutes trades there will be new trades added while I am waiting for the request to respond. The data will have a gap between the requested data and the new updates. Also, if for any reason there was a disturbance in the network I can pickup the stream from the last update I have easily.

Redis streams give the client an incremental ID for every entry that you can use to query for both the past data and new updates so that your stream is consistent.

valentinvichnal commented 4 years ago

You can handle the sync on the client side:

  1. Client subscribes to trades event
  2. Client requests past 5 minutes trades
  3. Merge the two data by time or id
  4. Add new trades from the event

Time series databases can do a lot of optimizations because prices usually consistent (100, 98, 102), if you need to store trades for the long term I would use a time series database instead of Redis.

Redis Streams is certainly interesting if you store these trades only for a short time, you should take a look at the deepstream redis cache connector, maybe you can modify it and add streams.

redlock commented 4 years ago

Yes, we are doing it this way originally but it gets a bit messy if your network disconnects for say N seconds/minutes. Keep in mind that the first request for the past N minutes trades is expensive (they can be in the thousands of trades). So when you reconnect after a disconnect you will either need to reload the data (which isn't an option since it is an expensive operation) or:

The biggest problem in the above process in the buffering of incoming data while waiting for the missing data to arrive. We need to add logic and data structures to keep track and reconcile these two data structures both of which are using different connections (http and ws) in two different threads which caused concurrency issues as you can imagine.

It is the same with records. They aren't necessary you can just make a request to some REST endpoint to get a snapshot then update the record as updates arrive in the event channel. However it is a lot easier to work with records as it removes this cognitive load. Redis streams is a convenience that reduces the complexity of your frontend code the same way records do for syncing documents.

valentinvichnal commented 4 years ago

That is true, if the client disconnects regularly my way is inefficient.

Okay this Redis stream looks interesting, when I have some time I will work on this.

yasserf commented 3 years ago

Just took a deeper look into this. It's a pretty useful idea tbh, I don't have time to implement it but happy to either provide advice on this thread or be sponsored.

Generally adding in this functionality wouldn't be complicated given the modular DS system + the fact redis streams supports groups, which means we don't multiplex anything in deepstream.

All the complexity would be hidden from the client. So all the client needs to do is tell the server (this is very abstract and doesn't support range snapshots and so forth):

1) client.stream.subscribe('group/stream', (data: any) => boolean, { // So we have -, +, $, > and *, and all have a different meaning, and most of the times, can be used in different contexts. // basically we would need to provide more API friendly options that map onto this }) 2) client.stream.unsubscribe('group/stream', (data: any) => boolean) // this HAS to be done at somepoint, otherwise the stream remains open via redis

The server would map those to

When a connection reconnects, provided its the same user id, it will automatically use redis to playback the correct messages. Deepstream would literally just be a websocket shell that proxies it + does permissions + authentication.

I would recommend this be done via a plugin (you can register deepstream message handlers). The only thing that needs to change in core is the protobuf message parser. But that would be pretty simple, for example reference, heres events:

https://github.com/deepstreamIO/deepstream.io-protobuf/blob/master/event.proto

jaime-ez commented 3 years ago

If anyone want's to implement this as a plugin and add the corresponding messages to the parser please make a pull request to the protobuf repo.