Open adam0z opened 6 years ago
@adam0z currently transport event loop is running in a separate thread, and there is no easy way to get out Handle
(since it's not Send
, so cannot move in between threads).
Instead you should run your own event loop (Core
) and make it accessible inside RPC method (through Remote
for instance).
Right, so I created a Core and packed it into Remote in main function:
fn main() {
let mut core = Core::new().unwrap();
let postgres_loop = Remote::Shared(core.remote());
let mut io = IoHandler::new();
io.extend_with(RpcImpl.to_delegate());
let server = ServerBuilder::new(io)
.start(&"0.0.0.0:10000".parse().unwrap())
.expect("Server must start with no issues");
server.wait();
postgres_loop.wait();
}
How could I pass the Remote instance to my rpc method?
pub struct RpcImpl;
impl Rpc for RpcImpl {
fn async_postgres(&self) -> BoxFuture<String, Error> {
postgres_loop.remote().spawn(move |handle| {
let done = Connection::connect("postgresql://postgres@localhost:5433",
TlsMode::None,
handle)
(...)
}
}
}
let postgres_loop = Remote::Shared(core.remote());
let mut io = IoHandler::new();
io.extend_with(RpcImpl { postgres_loop }.to_delegate());
&
pub struct RpcImpl {
postgres_loop: ...
}
should work
So easy... Thanks.
I created a channel to send a message from a postgres loop future. But the code blocks on receiving it.
fn async_postgres(&self) -> BoxFuture<String, Error> {
use jsonrpc_core::futures::sync::oneshot;
let (tx, rx) = oneshot::channel();
self.postgres_loop.remote().spawn(move |_handle| {
tx.send("OK".to_string());
Ok(())
});
rx
.map_err(|e| Error::new(ErrorCode::ServerError(-32432)))
.boxed()
}
What am I doing wrong?
Can you post the entire example so that we can try to reproduce that? I don't see a reason why it shouldn't work given that snippet.
I have prepared a minimal example to reproduce the problem: async_postgres.tar.gz
> rustc --version
rustc 1.25.0 (84203cac6 2018-03-25)
I tested it using a Python client ./client/client.py
that calls call_async
(works properly) and async_postgres
(hangs).
I do not call run method of tokio_core::reactor::Core
, because as I understand, there is no place for this.
Ok, so that's definitely the culprit. Either you need to run
with neverending future or you need to drive the event loop manualy via loop { core.turn(None) }
, otherwise any future run on that event loop won't run to completion.
Thanks for the quick response!
I added core.run(future::empty::<(), ()>()).unwrap();
before server.wait();
and it works!
So if I needed a next event loop (besides the postgres one, for example for a redis), would I also run any future and join it with the postgres event loop?
It depends on what kind of applicaiton you are building, but it should be enough to have one event loop for everything.
You can initialize servers with existing event loop Remote
https://docs.rs/jsonrpc-http-server/8.0.0/jsonrpc_http_server/struct.ServerBuilder.html#method.event_loop_remote
If I initialize a server with the Postgres event loop Remote
, will not any new request be processed until database IO operation does not finish?
On the other hand, what should I use if I need to run multiple database operations concurrently? I used tokio_pool crate and passed it to RPC methods a tokio_pool:TokioPool
instance instead of a jsonrpc_server_utils::reactor::Remote
. Do you know another method?
I'm not sure if Connicpu/tokio-pool crate is mature and supported. Nevertheless, a next_worker().spawn()
method provides tokio::reactor::Handle needed by tokio_postgres::Connection.connect
.
I would like to start using async postgres with my async RPC method. I need to pass a
tokio_core::reactor::Handle
toconnect
method: https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/src/lib.rs#L299In tokio-postgres examples they get a handle from:
I assume that I should not do it that way, but I attached a file with async method with this code. rpc_impl.txt
How can I access a handle (I guess to event loop) inside a RPC method? What should I change in my code?