google / tarpc

An RPC framework for Rust with a focus on ease of use.
MIT License
3.27k stars 198 forks source link

SyncClient with event loop driven by current thread #89

Closed jonhoo closed 7 years ago

jonhoo commented 7 years ago

Following the observations in #88, I wanted to modify the code such that the thread making an RPC call is the one that drives the event loop. However, there doesn't seem to any way to do this with SyncClient as far as I can tell? When you make the function call representing an RPC method, you have no way to also make that thread drive the core until the call completes? Of course, with a FutureClient you could implement this yourself pretty trivially, but having this be a supported mode of operation for SyncClient would be really nice.

tikue commented 7 years ago

Yeah, I've run into this issue myself! I was kind of thinking of just disallowing specifying the handle for sync services and clients and have them run on an arbitrary background reactor driven by a future that never completes -- the logic being that users of Sync* don't want to think about the async details at all. This is obviously less flexible, but I kind of feel like that's a feature, in a way. What do you think?

cc @shaladdle

tikue commented 7 years ago

Thanks for documenting these issues, by the way! I'm very glad to have others thinking about these things.

tikue commented 7 years ago

My recommendation as a current workaround is to create a reactor running continuously on a background thread, something like the following:

fn spawn_core() -> reactor::Remote {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let mut core = reactor::Core::new().unwrap();
        tx.send(core.handle().remote().clone()).unwrap();

        // Run forever
        core.run(futures::empty::<(), !>()).unwrap();
    });
    rx.recv().unwrap()
}
jonhoo commented 7 years ago

Hmm, if I understand you correctly, that would mean having a separate dedicated thread for each SyncClient that just drives the reactor. Wouldn't it be better if, when you try to make an RPC, the current thread just drives the reactor as far as it needs to?

Something along the lines of:

pub struct SyncClient {
    rpc: ext::FutureClient,
    core: reactor::Core,
}

// safe because the core will remain on the same core as the client
unsafe impl Send for SyncClient {}

impl SyncClient {
    fn new(addr: &str) -> Self {
        let mut core = reactor::Core::new().unwrap();
        let options = tarpc::client::Options::default().handle(core.handle());
        let client = core.run(ext::FutureClient::connect(addr.first_socket_addr(), options)).unwrap();
        SyncClient {
            rpc: client,
            core: core,
        }
    }

    fn foo(&mut self, arg: i64) -> Result<i64> {
        self.core.run(self.rpc.foo(arg))
    }
}
jonhoo commented 7 years ago

Part of the motivation for this is that I'm writing servers where I'm already spawning a non-trivial amount of cores, and where performance is a concern, so spawning extra threads and adding extra cross-thread traffic is not ideal. Especially since SyncClient doesn't really need to do that.

tikue commented 7 years ago

I was thinking of having a pool of reactors or something, so rather than spawning a thread per client it would just randomly choose one of the existing reactors, maybe using some heuristics to determine which ones are less busy.

jonhoo commented 7 years ago

Why do you think it's important to have a separate thread drive the reactors? The thread doing the RPC is blocked waiting on the result anyway..?

tikue commented 7 years ago

My main concern is that if you allow driving services and clients from the thread they're created on, then you can run into weird deadlock. Granted, this is probably only a problem for test code where a user starts a service listening on the same thread from which the client connects...but I prefer removing as many obstacles as possible for new users.

jonhoo commented 7 years ago

Hmm, I don't think it'd be possible to construct such code using SyncClient? If your server is SyncService, then obviously you can't. If your server is a FutureService, then true, you could, although your code would very clearly be trying to drive the service forward (core.run) after doing a blocking function call, which is also a pretty strange thing to try to do.

tikue commented 7 years ago

Why can't this happen with SyncService? At least in master right now, the following code deadlocks:

let addr = "localhost:10000";                                                                                                             
let core = reactor::Core::new().unwrap();                                                          
HelloServer.listen(addr, server::Options::default().handle(core.handle())).unwrap();               
let client = SyncClient::connect(addr, client::Options::default().handle(core.handle())).unwrap();
println!("{}", client.hello("Mom".to_string()).unwrap());

(by the way, feel free to hop in our gitter channel; we might be able to discuss this more quickly synchronously (pun intended))

tikue commented 7 years ago

Similar conversations are occurring in hyper.

tikue commented 7 years ago

Fixed in #92