kyren / turbulence

Networking library for games, multiplex reliable and unreliable streams over unreliable datagrams.
Apache License 2.0
262 stars 14 forks source link

Questions about API usage #6

Open AngelOfSol opened 3 years ago

AngelOfSol commented 3 years ago

So I've been trying to leverage turbulence for a game I'm writing (a networked fighting game), and I've been trying to make a lobby system to enable a better networked place experience. So far, I've been running into inconveniences while using the API, but I'm not sure if that's me not approaching the problem the correct way or not understanding the API. Here is my actual code that I've been working on for context.

My two main pain points are as follows, hopefully you have ideas on how I can address them:

1) MessageChannels doesn't differentiate between connections (i.e. different people connecting to the same client). I've been approaching this by spawning a new MessageChannels for each connection, which seems to work just fine, but makes it a little painful when trying to read from the socket, because I have to check to make sure that the datagram I'm receiving is from the right address. It also leads me to having to setup a new task to create connections whenever a new address sends data. 2) Not great compatibility with what I've been trying to do as an async workflow. Here is the code for my message loop:

async fn messages_loop_t(channels: ChannelList, mut messages: MessageChannels) {
    let messages = &mut messages;
    loop {
        // forwards incoming requests out
        if let Ok(value) = channels.join_request.incoming.try_recv() {
            messages.async_send(value).await.unwrap();
            messages.flush::<JoinRequest>();
        }
        if let Ok(value) = channels.join_response.incoming.try_recv() {
            messages.async_send(value).await.unwrap();
            messages.flush::<JoinResponse>();
        }

        // checks received data, and forwards it
        if let Some(value) = messages.recv::<JoinRequest>() {
            channels.join_request.outgoing.send(value).await.unwrap()
        }
        if let Some(value) = messages.recv::<JoinResponse>() {
            channels.join_response.outgoing.send(value).await.unwrap()
        }

        yield_now().await
    }
}

As you can see, I call the sync nonblocking methods of the messages object to check if theres any data to receiver, and yield_await at the end of the loop. It seems like a better approach would be selecting on the message type OR spawning a new task for each message, but neither of these are viable given that recv/send require an &mut MessageChannels. Additionally each recv Future is tied to the lifetime of the borrow, which means I can't return the future to be held around somewhere else. This code forwards each recieved data to a smol::channel, which lets me clone consumers allowing me to engage in behavior like the following:

    async fn request_join(&mut self, lobby: Self::LobbyId) -> Result<Self::LobbyId, JoinError> {
        let handle = self.handle.clone();
        let (incoming, outgoing) = {
            let mut inner = self.handle.write().await;
            let inner = inner.deref_mut();
            let connection = inner.connections.get_or_create_connection(
                lobby,
                inner.socket.clone(),
                inner.runtime.clone(),
            );
            (
                connection.channels.join_response.incoming.clone(),
                connection.send_request(JoinRequest { addr: lobby }),
            )
        };

        outgoing.await.unwrap();

        let response = incoming.recv().await.unwrap();

        match response {
            JoinResponse::Denied => Err(JoinError::Denied),
            JoinResponse::Accepted { self_addr } => {
                let mut lock = handle.write().await;
                lock.mode = Mode::Client(lobby);
                lock.self_addr = self_addr;
                Ok(lobby)
            }
        }
    }

Are there any solutions you would suggest to my issues/should I attempt to PR a design to address them? Thank you for making this really cool library either way!

kyren commented 3 years ago

Hey, sorry I've been behind on issues and PRs, I'm slowly going back through my backlog to catch up.

MessageChannels doesn't differentiate between connections (i.e. different people connecting to the same client). I've been approaching this by spawning a new MessageChannels for each connection, which seems to work just fine, but makes it a little painful when trying to read from the socket, because I have to check to make sure that the datagram I'm receiving is from the right address. It also leads me to having to setup a new task to create connections whenever a new address sends data.

This is correct, turbulence takes an unreliable stream of packets and turns it into multiplexed streams of optionally reliable messages. How to organize these into different logical connections is out of scope for turbulence. Generally what's expected is that you would do exactly what you figured out, each connection (if this is going over UDP, this would be identified by the source address and source port) would get its own individual PacketMultiplexer and MessageChannels instance. Additionally, how to establish new connections and how to prevent your server from being vulnerable to being tricked into creating lots of server side state on every single new UDP packet is again, out of scope for turbulence.

As you can see, I call the sync nonblocking methods of the messages object to check if theres any data to receiver, and yield_await at the end of the loop. It seems like a better approach would be selecting on the message type OR spawning a new task for each message, but neither of these are viable given that recv/send require an &mut MessageChannels. Additionally each recv Future is tied to the lifetime of the borrow, which means I can't return the future to be held around somewhere else. This code forwards each recieved data to a smol::channel, which lets me clone consumers allowing me to engage in behavior like the following:

My brain is a little fried at the moment so I'm having trouble thinking through the specifics of this sort of use case, but mostly this library is intended for networked realtime games. How I personally use the library is that messages are sent and received inside the main game loop and are ALWAYS sync nonblocking. The reason for this is that if you do something like messages.async_send(value).await.unwrap(); and the message type is reliable, think about what this does: it allows a game client to indefinitely block the game loop until the value can be sent.

It's possible that you're adding another layer that handles this nonblocking behavior in a loop that's in front of MessageChannels, but the original design goal was that the actual, real main game loop (which is not async, after all it has to run at 30hz, 60hz, 120hz whatever) would interact with the MessageChannels for each client directly. If there is no room in a particular channel.. something must happen, either the messages are dropped or the client is prevented from generating new data that it needs or is frozen in place or just dropped or something other than the server creating arbitrary amounts of unsent data and filling RAM. What specifically to do when messages are unsent would be specific to each message type.

The async API is mostly intended for startup / shutdown procedures where the client is not yet itself doing a similar 60hz loop.

However, in either case, there is a potentially better channels implementation that would allow MessageChannels methods to all be &self which I was considering anyway, and that might help the use case you have in mind?