tokio-rs / tokio-core

I/O primitives and event loop for async I/O in Rust
Apache License 2.0
634 stars 116 forks source link

Example for more complicated UdpServer Sink Handling #287

Open dl00 opened 6 years ago

dl00 commented 6 years ago

I am attempting to write a UdpServer which responds to packets received through UdpFramed. however I run into difficulties when using Sink because it is consumed upon a call by Stream::forward when making a reply. I have tried placing the future returned by Stream::forward to an instance variable, but it is not possible to clone (reasonably) when a call to Handler::spawn is required as well as a new saving of the feature to the instance variable. I just want the frames to be sent and flushed at the earliest time (order is not required), but I am no longer sure what the intended way to do this is.

This problem would not be an issue on a TCP connection since each client utilizes a separate Sink, and therefore it is sensible to send the reply before pulling more data.

Let me know if I should post a Rust playground or something. An example of the intended way to run a server like this would be helpful. I have looked at the examples, but they do not really concern my problem much at all.

alexcrichton commented 6 years ago

Thanks for the report! Have you tried using split perhaps to get two distinctly owned objects?

dl00 commented 6 years ago

yes, that is what I have already been doing. However, every time I try to call send, the Sender gets consumed, meaning that it technically cannot be accessed by another asynchronous part of the program without calling wait(), blocking the event loop.

I am using Core and assigning jobs through handlers, so since some of those handler tasks will end up sending packets, I have found it to be a tricky issue.

alexcrichton commented 6 years ago

Yes while one sink is being sent on you can't simultaneously send another, but you in general don't need to block the event loop but instead would rather plumb around the future to ensure that it only sits on the one Core running the program.

dl00 commented 6 years ago

What would be the best way to do that (hence the example)? I have tried using a shared Future<Item=Sink<>, Error=()> to handle this async, but this does not work very well when it comes to applying the send to a task.

alexcrichton commented 6 years ago

Oh we definitely need better examples here, we just need to figure out what to write down! If you've got a relatively self-contained gist or such it'd probably be best to go from there.

dl00 commented 6 years ago

yea I will prepare a gist hopefully within the next 24 hours and get back to you with it. I will get it as close as I can to being what I think you would want to show but it will not work fully. Hopefully that will show what I mean.

dl00 commented 6 years ago

here, this should be helpful: https://gist.github.com/anonymous/1d591a67ae5cddcc118aa42eef04af39

alexcrichton commented 6 years ago

Thanks for the gist @dl00! So I think the problem here can be boiled down to the fact that you've got a number of clients that are all trying to send a message to the same sink. For a situation like that if this were synchronous you might use a mutex, but we unfortunately don't have a futures-powered mutex right now.

Instead though what I might recommend is something like an mpsc channel. If you don't want to worry about backpressure you can use mpsc::unbounded. That way whenever you send a message you'd just place it in the queue, and then the queue, in a separate task, would be continuously processed to actually write to the udp socket.

Would something like that work for you?

dl00 commented 6 years ago

Yea, a mutex would not make any sense anyway since the whole thing is single threaded.

You are right, a channel may make the most sense. I was just trying to be overly careful since I was not sure if using an unbounded sink would make sense for this kind of system (since packets could potentially take long to send on a slow connection). I have no problem sticking with that if that is what you would recommend as optimal, however.

alexcrichton commented 6 years ago

Oh well the other option is a mspc bounded channel perhaps. Would that work for you?

dl00 commented 6 years ago

How would that work? Would I have to clone the sender or something?

alexcrichton commented 6 years ago

Indeed! You'd create a sender per task in that case.

dl00 commented 6 years ago

But then how do you pipe the mpsc recipients all to the Udp sink? The sink gets consumed when you call forward or send_all I thought. That was the problem that I ran into with that, unfortunately.

alexcrichton commented 6 years ago

Yes in this case you'd have a separate task that used the mpsc receiver to pipe that into a UDP socket, and that'd just be running continuously in the background sorta.

dl00 commented 6 years ago

Are you planning to make an example for this or should the issue just be closed? That sounds like a good solution.