bastion-rs / bastion

Highly-available Distributed Fault-tolerant Runtime
https://www.bastion-rs.com
Apache License 2.0
2.79k stars 103 forks source link

Having Tasks Pinned to Thread #249

Open anlumo opened 4 years ago

anlumo commented 4 years ago

Is your feature request related to a problem? Please describe.

I want to combine bastion with winit for a desktop application that uses wgpu rendering to display content.

For this, we want to have a thread dedicated to wgpu (at least the main render loop, it might spawn more children for specific tasks). It should be able to receive and send messages via bastion to other parts of the application (like the UI). This thread might own local data that is not Send.

Describe the solution you'd like

For the function passed to children.with_exec, I tried the following code:

async fn exec(ctx: BastionContext) -> Result<(), ()> {
    let task = blocking! {
        run!(async move {
            loop {
                msg! { ctx.recv().await?,
                    msg: Message => { /* … */ };
                    // …
                }
            }
            Ok(())
        })
    };
    task.await.unwrap()
}

In theory, this should spawn a new task on a thread designated for blocking tasks that then executes ctx.recv() in a loop without ever switching thread.

However, what actually happens is that ctx.recv().await never returns, it just hangs there.

You can see the full example here.

Describe alternatives you've considered

@o0Ignition0o suggested wrapping the task in the code above in a run! instead of awaiting it, which does work. However, then I'm blocking a thread that isn't supposed to be blocked, specifically the one calling the exec function.

Alternatively, @o0Ignition0o suggested adding a feature to bastion like create_named, which specifically spawns an OS thread just for this one async closure. This is what this ticket is about.

I'm currently trying to use tokio's functionality to achieve this, but it's hard to find a way to wrap a non-Send Future so that I can .await it in the calling function without blocking that thread.

anlumo commented 4 years ago

Another solution I tried that didn't work, using the futures 0.3 crate:

    pub async fn exec(ctx: BastionContext) -> Result<(), ()> {
        let (sender, receiver) = futures::channel::oneshot::channel::<i32>();
        std::thread::spawn(|| {
            futures::executor::block_on(async move {
                loop {
                    msg! { ctx.recv().await.unwrap(),
                        msg: RenderMessage => {
                            log::debug!("render msg: {:?}", msg);
                        };
                        ref msg: super::QuitMessage => {
                            log::info!("Renderer shutting down.");
                            break;
                        };
                        _: _ => panic!("Unknown message received");
                    }
                }
                sender.send(0).unwrap();
            });
        });
        receiver.await.ok();

        Ok(())
    }
anlumo commented 4 years ago

This does work, kinda:

    pub async fn exec(ctx: BastionContext) -> Result<(), ()> {
        log::debug!("Starting renderer...");
        let (sender, mut receiver) = futures::channel::mpsc::unbounded::<RenderMessage>();
        let render_thread = std::thread::spawn(|| {
            futures::executor::block_on(async move {
                while let (Some(msg), new_receiver) = receiver.into_future().await {
                    match msg {
                        RenderMessage::Quit => break,
                        RenderMessage::Foo => log::debug!("Render thread: Foo"),
                        RenderMessage::Bar => log::debug!("Render thread: Bar"),
                    }
                    receiver = new_receiver;
                }
            });
        });

        loop {
            log::debug!("Before msg!");
            let msg = ctx.recv().await;
            log::debug!("Received message {:?}", msg);
            msg! { msg.unwrap(),
                msg: RenderMessage => {
                    log::debug!("render msg: {:?}", msg);
                    sender.unbounded_send(msg).unwrap();
                };
                ref msg: super::QuitMessage => {
                    log::info!("Renderer shutting down.");
                    sender.unbounded_send(RenderMessage::Quit).unwrap();
                    render_thread.join().unwrap();
                    break;
                };
                _: _ => panic!("Unknown message received");
            }
        }

        Ok(())
    }

It's awfully inconvenient, though. It has a dispatch task that only forwards messages to the actual thread, and I need to implement my own two-way communication protocol for this.