kyren / webrtc-unreliable

Just enough hacks to get unreliable unordered WebRTC data channels between a browser and a server
Other
394 stars 29 forks source link

How to handle Server.recv() blocking until message received? #8

Closed connorcarpenter closed 4 years ago

connorcarpenter commented 4 years ago

Apologies for my ignorance..

So it appears that calling Server.recv() is required for the server to function correctly. However, Server.recv().await appears to block until it receives some communication from a client. In webrtc_unreliable's provided example, Server.send().await is called after receiving a message from each client. How would you go about sending messages to connected clients if the clients are NOT sending messages to the server?

When I try to do something like this:

loop {
   match rtc_server.recv().await {
       // do something with the received client message
   }

   //send messages to all clients, doing something like this:
   rtc_server.send().await;
}

and no clients are sending messages, rtc_server.recv().await just blocks and never gets to .send(). I've tried putting rtc_server.recv() into it's own thread, but then get issues with sharing the mutable reference rtc_server between threads :/

Got any advice? Thank you for any help! Let me know if you'd like more concrete examples.

connorcarpenter commented 4 years ago

I've found that modifying recv() to return an optional MessageResult works for this need:

pub async fn recv(&mut self, buf: &mut [u8]) -> Result<Option<MessageResult>, RecvError> {
        // Instead of a while loop here, just either: A. receive rtc messages, or B. handle background tasks
        if self.incoming_rtc.is_empty() {
            self.process().await?;
            return Ok(None);
        }
        else {
            let (message, remote_addr, message_type) = self.incoming_rtc.pop_front().unwrap();
            let message = self.buffer_pool.adopt(message);
            let message_len = message.len();

            let copy_len = message_len.min(buf.len());
            buf[..copy_len].copy_from_slice(&message[..copy_len]);

            let result = MessageResult {
                message_len,
                message_type,
                remote_addr,
            };

            if copy_len < message_len {
                return Err(RecvError::IncompleteMessageRead(result));
            } else {
                return Ok(Some(result));
            }
        }
    }

but I believe you have probably already designed for this case and have a more elegant solution :)

kyren commented 4 years ago

This comes up a lot, it's kind of a general question about how to use the API that shows up whenever you want to send data not immediately coupled with receiving data.

The fundamental problem is that webrtc-unreliable needs to do 3 things all at once all with a single UdpSocket which cannot be shared: 1) background tasks like STUN packets SCTP stuff 2) read incoming messages 3) write outgoing messages. Currently 1 and 2 are both called recv and 3 is called send.

Background tasks and receiving messages are tied together because both of those actually involve waiting on reading new packets from the UdpSocket, so you can't really decouple them. If you're not actually expecting to receive any messages, you still need to be doing background processing constantly so you still need to be almost always blocking on recv.

The way that I use Server and the way its intended to be used when sending messages is not closely coupled to reading messages (like in the simplistic example) is to use some kind of async channel and select! over two futures in a loop, one that is calling Server::recv and one that is receiving from the channel. That way, normally you're blocking on a future that is doing background processing, and when some other process determines that you need to send a packet, the select! call will return with the outgoing message and then you can Server::send that message and then quickly return to blocking on Server::recv. Server::send is an async function because tokio's UdpSocket::send_to is also async, but it's expected to return very quickly, it's just waiting on the OS to send a UDP packet which will generally happen very fast, so mostly you will be blocking on Server::recv which does all the background work.

The reason that this is not already built into the Server is that I felt it would be a bit too opinionated, there are a lot of ways you might want to use this library so only the minimum interface is provided and you can decide exactly how best to do the rest of it. There's not a universal best solution for how to handle data being sent and received that would cover all possible use cases, but usually it looks something like what I described (but the specifics are not always the same).

I don't think the API change you made is correct, it would just return Ok(None) occasionally, where occasionally would mean whenever a background processing timer fired or whenever there was an incoming UDP packet that didn't result in a new message. These things would happen fairly often but it wouldn't be continuously happening, so there would still be needless latency on sending. You really need to block on both recv and whatever is triggering you to want to send messages (a channel or something else).

connorcarpenter commented 4 years ago

This is great Kyren, thank you! Just learning async stuff now so that didn't occur to me. I'll try to get that working and post an example 👍

connorcarpenter commented 4 years ago

And now I see https://github.com/kyren/webrtc-unreliable/issues/5 which is related. Doh

chpio commented 3 years ago

That way, normally you're blocking on a future that is doing background processing, and when some other process determines that you need to send a packet, the select! call will return with the outgoing message and then you can Server::send that message and then quickly return to blocking on Server::recv.

Wouldn't that "kill" the Server::recv future (stop polling it & drop it) mid action? Is this safe to do? Or is there a way to resume the recv future with select (i think, this should not be possible because of &mut)?

Because you are mentioning this pattern (and it's the only way to send, that is not directly coupled with a previous receive), it should have got pretty good testing?

(The same goes for the sending future as well if it's not a very simple mpsc::Receiver)

https://github.com/rust-lang/wg-async-foundations/issues/65

chpio commented 3 years ago

hmm, is it possible, that a cancellation of the Server::recv future might cause data loss? It seems that the async-io crate is reading the data packet on a different thread, then sends it to the current one. If the current read operation cancels, the data is lost.

On the other hand. We are in an "unreliable" field and count such a dropped packet as a package loss. But if we are in a "contention" situation (like receiving & sending a lot of data) that this might cause "too much" data loss?

kyren commented 3 years ago

Wouldn't that "kill" the Server::recv future (stop polling it & drop it) mid action? Is this safe to do? Or is there a way to resume the recv future with select (i think, this should not be possible because of &mut)?

I try to make every future returned by any async API that I make "drop safe" or "cancel safe" in this way (I'm not exactly sure what it's called?) or explicitly note that it is not cancel safe. Up until this point I was under the impression that the whole API was cancel safe, but it's possible that I'm just missing a bug due to assuming unreliability anyway.

hmm, is it possible, that a cancellation of the Server::recv future might cause data loss? It seems that the async-io crate is reading the data packet on a different thread, then sends it to the current one. If the current read operation cancels, the data is lost.

Are you sure about this? After looking through async-io it seemed like only the epoll part was on a different thread, notifying tasks that they are ready to make progress, not actually delivering data? This would be bad, I was under the impression that the async-io UDP async methods were also cancel safe, it would be pretty weird otherwise.

In any case, if webrtc-unreliable is dropping data on canceling a Server::recv future then this is a bug and I'll treat it as such.

Whether or not dropping data on canceling a Server::send future is a bug is a bit more subtle. I would probably consider either fully queuing delivery on send or fully not queuing message delivery on send to both be cancel safe. It's a subtle point but we should probably just document the behavior either way. Being cancel safe is in my mind mostly about remaining fully consistent and not doing anything "surprising", and being able to predict the state of things if a future is canceled, but I realize this is a subjective viewpoint. I would consider dropping received data to be extremely surprising. In this case, sent data is always queued for send even when the future is canceled and will be sent actually by the next call to Server::recv since it also processes outgoing UDP messages, but that's sort of an implementation detail. You could make the case that it would be better for canceling Server::send to actually not queue data.

chpio commented 3 years ago

hmm, is it possible, that a cancellation of the Server::recv future might cause data loss? It seems that the async-io crate is reading the data packet on a different thread, then sends it to the current one. If the current read operation cancels, the data is lost.

Are you sure about this? After looking through async-io it seemed like only the epoll part was on a different thread, notifying tasks that they are ready to make progress, not actually delivering data? This would be bad, I was under the impression that the async-io UDP async methods were also cancel safe, it would be pretty weird otherwise.

No, sorry, i was just "assuming", haven't looked into async-io code.

Thank you for the answer, it's very reassuring, to know that everything is (or was designed to be) cancel safe!

wmww commented 3 years ago

If anyone wants to look at a working example in a non-trivial program, I think what I figured out here is about the same as what kyren described: https://github.com/OpenStarscape/starscape-server/blob/484178528752d2ca7ab094244fdcd6304c88f606/src/server/webrtc/webrtc_server.rs#L88. It's a little complicated as real things tend to get and I can't guarantee it's correct, but it seems to work and might be helpful. All MIT licensed, so take whatever you want.