schets / multiqueue

A fast mpmc queue with broadcast capabilities
MIT License
201 stars 29 forks source link

Ability to add_stream() to only receive new messages #13

Open Boscop opened 7 years ago

Boscop commented 7 years ago

I'm using multiqueue::broadcast_queue in a situation where I spawn a new thread for each websocket client and call add_stream() for each before spawning the thread, so it gets moved into the thread. (Sending msgs from my main thread to the browser clients). The problem is, a thread receives old messages that were sent before calling .add_stream(), I think it would be useful to have a way to add a stream such that the new stream only receives new messages.

E.g. this is an example of such a scenario: http://dpaste.com/1CRPJYN In line 37 I'm emptying the receiver before I call add_stream(), as a workaround, so it doesn't get old messages.

Also it would be useful to have an queue with unbounded capacity.

Btw, my assumption is that I should use add_stream() when I want messages to be sent to both receivers, and clone() when I want both receivers to compete for dispatching the messages (the first one that asks for a message gets it, the other one doesn't). Is this assumption correct?

schets commented 7 years ago

Yeah, your assumption about clone vs add-stream is correct.

It's pretty simple to make a best-effort attempt to create a new reader at the current write position, but this will either:

On the unbounded queue: Multiqueue is never going to be unbounded. For the things that I intended it for (high throughput+low latency messaging) unboundedness would incur some pretty undesirable performance costs.

Boscop commented 7 years ago

The problem is, even with this workaround, it only works if a new client connects before the receiver queue is full (since if gets emptied when a client connects in line 37). Otherwise it crashes. Is there a way I can take out the sent value right after I send it in line 18? The problem is, then the ui_state thread owns it so it can't be moved into the closure below. What's the best way to solve this? I need to keep a receiver somewhere to clone it to be able to pass it to the spawned threads for the clients.

sschetterer commented 7 years ago

The problem is, even with this workaround, it only works if a new client connects before the receiver queue is full (since if gets emptied when a client connects in line 37). Otherwise it crashes.

There are ways to deal with this outside of crashing on a full queue, but with a bounded queue you fundamentally have this issue. What happens if the UI thread can't communicate back to the main thread? It should deal with that instead of crashing or just piling up messages. A prototyping way to deal with this would be to spin on send. I'm going to add a proper blocking api and the futures api already supports blocking send.

I need to keep a receiver somewhere to clone it to be able to pass it to the spawned threads for the clients.

I've been thinking of a way to do this without requiring an existing reader. Using an existing reader provides a rudimentary 'pin' of sorts so that writers can't overrun the index of the existing reader. I think I've figured out a way around this though, but I'm not sure if it works.

Boscop commented 7 years ago

Is your solution available somewhere on a branch? Or do you know how I can solve this problem in my use case in the meantime? :)

schets commented 7 years ago

I'm not even sure if the solution is correct, it only exists in my head up till this point. If you want to send without crashing, for now you can spin until the send works.

I've got a whole lot on my plate and probably won't be able to look at this in depth until the weekend

Boscop commented 7 years ago

By spin do you mean trying to send in a loop?

Then it won't crash, but the problem is, the receiver (the one that gets cloned to pass to the threads) only gets emptied when a new client connects. In this application, I only have 1 active client at a time (it's a web UI for my Rust app), and it will only reconnect when I refresh the tab. So it goes a long time without being able to empty the queue, but there are a lot of msgs being sent back and forth between UI and application. So how can I empty the queue more frequently to avoid blocking on send? It has to be moved into the closure that is passed to listen(), so I can't e.g. empty it in the ui_state thread whenever I send from there...

schets commented 6 years ago

I'm going to make an api change that would support such behavior in a best-effort sense, as I've realized the other way leads to some edge-case race conditions

Boscop commented 4 years ago

@schets Any update on this? :)