Open nikomatsakis opened 3 years ago
As many of the readers already know, for the past (almost) 2 years, I've been developing a Rust-crate for D-Bus, called zbus. With this being my first big Rust project to start from scratch done almost entirely in my spare time, the progress was rather slow for many months. My perfectionism didn't help much with the progress either but also the fact that implementing a Serde API for the D-Bus format was quite challenging. Then along came my old friend and ex-colleague, Marc-André Lureau who sped up the progress 10 times and soon after we had the 1.0 release of zbus.
While my original plan (perfectionism again) was for the API to be primarily async, with the synchronous API mostly just a wrapper around it, it was Marc-Andre who ended up doing most of the work and coming up with nice high-level API and his use case was primarily synchronous so we decided to go with synchronous API first. I still believe that was the right thing to do, since neither of us were familiar with async programming in Rust and going with the original plan would have meant the first release getting delayed by at least another half an year.
This may sound very disappointing to readers who come from glib programming background but a purely synchronous blocking API in a Rust app is not at all as bad it would be in a glib+C (or even Vala) app. There is a reason why Rust is famous for its fearless concurrency.
In 2013, I discovered the Rust programming language and quickly decided to learn it and make it my main programming language.
In 2017, I moved to Berlin and joined Parity as a Rust developer. The task that occupied my first few months was to build rust-libp2p, a peer-to-peer library in asynchronous Rust (~89k lines of code at the moment). Afterwards, I integrated it in Substrate (~400k lines of code), and have since then been the maintainer of the networking part of the code base.
In the light of this recent blog post and this twitter interaction, I thought it could be a good idea to lay down some of the issues I’ve encountered over time through experience.
Please note that I’m not writing this article on behalf of my employer Parity. This is my personal feedback when working on the most asynchronous-heavy parts of Parity-related projects, but I didn’t show it to anyone before publishing and it might differ from what the other developers in the company would have to say.
Most of the web services I've written in Rust have used actix-web. Recently, I needed to write something that will provide some reverse proxy functionality. I'm more familiar with the hyper-powered HTTP client libraries (reqwest in particular). I decided this would be a good time to experiment again with hyper on the server side as well. The theory was that having matching Request and Response types between the client and server would work nicely. And it certainly did.
In the process, I ended up with an interesting example of battling ownership through closures and async blocks. This is a topic I typically mention in my Rust training sessions as the hardest thing I had to learn when learning Rust. So I figure a blog post demonstrating one of these crazy cases would be worthwhile.
Recently I have been getting into async and tokio. With great success despite the tribulations I brought up here: A year in Rust. And I still can't read the docs! 6
My latest creation has tokio-tunstentite serving up web sockets to a test client also using tokio-tungstenite.
Having got that basic ws functionality working I proceeded to introduce NATS messaging into the mix using the nats crate. You see, those web socket clients are expecting to be fed data that derives from subscription to our NATS server (among other things). Each ws client's data requirements may require multiple subscriptions to different 'subjects' from NATS.
So here comes the problem:
Ever since it’s illegal for me to leave my house, my weekends have been filled with rewriting Whisperfish. Whisperfish is an app, originally by Andrew E. Bruno, that natively implements Signal for SailfishOS. My goal with the rewrite is to modernize the non-GUI code such that it uses the official libsignal-protocol-c instead of the Go-reimplementation. For this, I would either use C++ or Rust; the title of the post probably spoiled which one I prefer.
I’m imagining two target audiences for this blog post: either you’re a Rustacean, and you’re here for the Tokio and Actix magic, or (and that’s not xor) you’re from the SailfishOS community and you’re wondering what all the Tokio and Actix buzzwords are even about. With that in mind, I’ll make an introduction on both topics, and depending on your background, you can skip either.
We have been using async Rust first with Future combinators and using the async/await support as soon as it landed on nightly. It’s been an extraordinary feature that let us build massively concurrent applications using Tokio. We never had to spend much time to make our servers handle more than 10K concurrent connections or implement back-pressure. However, it can still get better:
- Unlike most parts of Rust, async functions are a bit of a footgun as they look innocuously similar to regular functions, but may not be executed entirely (more precisely, the Future they return may not be polled to completion). This requires extra carefulness to handle clean-up logic, which cannot currently be asynchronous itself. The ongoing work to support asynchronous drop will hopefully provide a piece of the solution. It’s still an open question on how to make the problems more visible; could an attribute make it clear the Future is cancelation-safe and a lint warning tell us otherwise?
- Though it is getting better, the ecosystem has been badly hurt by the split of asynchronous frameworks. The language would greatly benefit from providing the constructs that would allow task scheduling subsystems to be abstracted without overhead, so one may choose to use their favourite executor, and pass down task executors down to libraries, or drive the Future themselves.
- The current design of statically initialized task executors makes it easy to run several executors by mistake by simply pulling a dependency. The generalized usage of thread locals makes debugging more difficult.
- Being able to design asynchronous functions in traits without boxing, and to refer to the result type, would definitely be a great improvement in performance-sensitive code.
- We are also hopeful to see work around io-uring result in great performance improvements hopefully without creating further splits of the ecosystem.
In this post we will explore a brief example of asynchronous programming in Rust with the Tokio runtime, demonstrating different execution scenarios. This post is aimed at beginners to asynchronous programming.
In this post we will set up a very simple data ingestion process with Rust and AWS Lambda.
Lesson I — The Rust language does not include any intrinsic high-level means to poll futures. That is left as an “exercise for the reader” Lesson II — It is possible for a single dependency of your crate to be so tightly coupled to a future polling runtime that it effectively makes that runtime mandatory for all consumers. Lesson III — If you drop (or allow to be dropped) the tokio runtime, any pending tasks are cancelled. Lesson IV — Each runtime has a single blocking entry point, so it can only ever block on a single unit of asynchronous work.
This reddit thread has a lot of good material!
thanks to some excellent git bisecting from ceejbot, and a nudge in the right direction from tomaka17’s post “a look back at async rust” (in particular, the “flow control is hard” section), we were able to diagnose a surprising production rustlang crossbeam channel deadlock.
An excellent article from @fasterthanline covering pinning, threads, async tasks.
My colleague Matt Jibson and I recently found ourselves in the unfortunate situation of debugging this hefty async/await-related error from the Rust compiler:
error[E0277]: `(dyn futures::Stream<Item = std::result::Result<std::vec::Vec<repr::Row>, comm::Error>> + std::marker::Send + std::marker::Unpin + 'static)` cannot be shared between threads safely --> src/materialized/src/mux.rs:138:100 | 138 | async fn handle_connection(&self, conn: SniffedStream<TcpStream>) -> Result<(), anyhow::Error> { | ____________________________________________________________________________________________________^ 139 | | self.handle_connection(conn).await 140 | | } | |_____^ `(dyn futures::Stream<Item = std::result::Result<std::vec::Vec<repr::Row>, comm::Error>> + std::marker::Send + std::marker::Unpin + 'static)` cannot be shared between threads safely | = help: the trait `std::marker::Sync` is not implemented for `(dyn futures::Stream<Item = std::result::Result<std::vec::Vec<repr::Row>, comm::Error>> + std::marker::Send + std::marker::Unpin + 'static)` = note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<(dyn futures::Stream<Item = std::result::Result<std::vec::Vec<repr::Row>, comm::Error>> + std::marker::Send + std::marker::Unpin + 'static)>` = note: required because it appears within the type `std::boxed::Box<(dyn futures::Stream<Item = std::result::Result<std::vec::Vec<repr::Row>, comm::Error>> + std::marker::Send + std::marker::Unpin + 'static)>` = note: required because it appears within the type `std::option::Option<std::boxed::Box<(dyn futures::Stream<Item = std::result::Result<std::vec::Vec<repr::Row>, comm::Error>> + std::marker::Send + std::marker::Unpin + 'static)>>` = note: required because it appears within the type `coord::session::Portal` = note: required because of the requirements on the impl of `std::marker::Send` for `&coord::session::Portal` = note: required because it appears within the type `for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8> {std::future::ResumeTy, &'r mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, std::string::String, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, &'s mut coord::SessionClient, coord::SessionClient, &'t0 coord::session::Session, &'t1 mut coord::session::Session, &'t2 str, &'t3 std::string::String, std::option::Option<&'t4 coord::session::Portal>, tokio_postgres::error::sqlstate::SqlState, impl futures::Future, (), &'t7 coord::session::Portal, impl futures::Future}` = note: required because it appears within the type `[static generator@pgwire::protocol::StateMachine::<A>::describe_portal::#0 0:&mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, 1:std::string::String for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8> {std::future::ResumeTy, &'r mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, std::string::String, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, &'s mut coord::SessionClient, coord::SessionClient, &'t0 coord::session::Session, &'t1 mut coord::session::Session, &'t2 str, &'t3 std::string::String, std::option::Option<&'t4 coord::session::Portal>, tokio_postgres::error::sqlstate::SqlState, impl futures::Future, (), &'t7 coord::session::Portal, impl futures::Future}]` = note: required because it appears within the type `std::future::from_generator::GenFuture<[static generator@pgwire::protocol::StateMachine::<A>::describe_portal::#0 0:&mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, 1:std::string::String for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8> {std::future::ResumeTy, &'r mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, std::string::String, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, &'s mut coord::SessionClient, coord::SessionClient, &'t0 coord::session::Session, &'t1 mut coord::session::Session, &'t2 str, &'t3 std::string::String, std::option::Option<&'t4 coord::session::Portal>, tokio_postgres::error::sqlstate::SqlState, impl futures::Future, (), &'t7 coord::session::Portal, impl futures::Future}]>` = note: required because it appears within the type `impl futures::Future` = note: required because it appears within the type `impl futures::Future` = note: required because it appears within the type `for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11> {std::future::ResumeTy, &'r mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, &'s mut pgwire::codec::FramedConn<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, pgwire::codec::FramedConn<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, impl futures::Future, (), std::option::Option<pgwire::message::FrontendMessage>, std::time::Instant, &'t1 str, std::string::String, impl futures::Future, std::vec::Vec<u32>, impl futures::Future, std::vec::Vec<pgrepr::format::Format>, std::vec::Vec<std::option::Option<std::vec::Vec<u8>>>, impl futures::Future, i32, usize, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future}` = note: required because it appears within the type `[static generator@pgwire::protocol::StateMachine::<A>::advance_ready::#0 0:&mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11> {std::future::ResumeTy, &'r mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, &'s mut pgwire::codec::FramedConn<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, pgwire::codec::FramedConn<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, impl futures::Future, (), std::option::Option<pgwire::message::FrontendMessage>, std::time::Instant, &'t1 str, std::string::String, impl futures::Future, std::vec::Vec<u32>, impl futures::Future, std::vec::Vec<pgrepr::format::Format>, std::vec::Vec<std::option::Option<std::vec::Vec<u8>>>, impl futures::Future, i32, usize, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future}]` = note: required because it appears within the type `std::future::from_generator::GenFuture<[static generator@pgwire::protocol::StateMachine::<A>::advance_ready::#0 0:&mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11> {std::future::ResumeTy, &'r mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, &'s mut pgwire::codec::FramedConn<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, pgwire::codec::FramedConn<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, impl futures::Future, (), std::option::Option<pgwire::message::FrontendMessage>, std::time::Instant, &'t1 str, std::string::String, impl futures::Future, std::vec::Vec<u32>, impl futures::Future, std::vec::Vec<pgrepr::format::Format>, std::vec::Vec<std::option::Option<std::vec::Vec<u8>>>, impl futures::Future, i32, usize, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future, impl futures::Future}]>` = note: required because it appears within the type `impl futures::Future` = note: required because it appears within the type `impl futures::Future` = note: required because it appears within the type `for<'r, 's, 't0, 't1> {std::future::ResumeTy, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, i32, std::vec::Vec<(std::string::String, std::string::String)>, &'r mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, impl futures::Future, (), pgwire::protocol::State, impl futures::Future, impl futures::Future, coord::SessionClient, impl futures::Future}` = note: required because it appears within the type `[static generator@pgwire::protocol::StateMachine::<A>::run::#0 0:pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, 1:i32, 2:std::vec::Vec<(std::string::String, std::string::String)> for<'r, 's, 't0, 't1> {std::future::ResumeTy, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, i32, std::vec::Vec<(std::string::String, std::string::String)>, &'r mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, impl futures::Future, (), pgwire::protocol::State, impl futures::Future, impl futures::Future, coord::SessionClient, impl futures::Future}]` = note: required because it appears within the type `std::future::from_generator::GenFuture<[static generator@pgwire::protocol::StateMachine::<A>::run::#0 0:pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, 1:i32, 2:std::vec::Vec<(std::string::String, std::string::String)> for<'r, 's, 't0, 't1> {std::future::ResumeTy, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, i32, std::vec::Vec<(std::string::String, std::string::String)>, &'r mut pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, impl futures::Future, (), pgwire::protocol::State, impl futures::Future, impl futures::Future, coord::SessionClient, impl futures::Future}]>` = note: required because it appears within the type `impl futures::Future` = note: required because it appears within the type `impl futures::Future` = note: required because it appears within the type `for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10> {std::future::ResumeTy, &'r pgwire::Server, ore::netio::SniffedStream<tokio::net::TcpStream>, pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>, &'s mut pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>, impl futures::Future, (), std::result::Result<pgwire::message::FrontendStartupMessage, std::io::Error>, pgwire::message::FrontendStartupMessage, i32, std::vec::Vec<(std::string::String, std::string::String)>, u32, coord::SessionClient, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, impl futures::Future, bool, pgwire::Server, &'t1 coord::Client, coord::Client, &'t2 mut coord::Client, impl futures::Future, &'t4 mut ore::netio::SniffedStream<tokio::net::TcpStream>, u8, [u8; 1], &'t5 [u8], &'t6 [u8; 1], tokio::io::util::write_all::WriteAll<'t7, ore::netio::SniffedStream<tokio::net::TcpStream>>, &'t8 openssl::ssl::SslAcceptor, impl futures::Future, tokio::io::util::write_all::WriteAll<'t10, pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>}` = note: required because it appears within the type `[static generator@pgwire::Server::handle_connection::#0 0:&pgwire::Server, 1:ore::netio::SniffedStream<tokio::net::TcpStream> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10> {std::future::ResumeTy, &'r pgwire::Server, ore::netio::SniffedStream<tokio::net::TcpStream>, pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>, &'s mut pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>, impl futures::Future, (), std::result::Result<pgwire::message::FrontendStartupMessage, std::io::Error>, pgwire::message::FrontendStartupMessage, i32, std::vec::Vec<(std::string::String, std::string::String)>, u32, coord::SessionClient, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, impl futures::Future, bool, pgwire::Server, &'t1 coord::Client, coord::Client, &'t2 mut coord::Client, impl futures::Future, &'t4 mut ore::netio::SniffedStream<tokio::net::TcpStream>, u8, [u8; 1], &'t5 [u8], &'t6 [u8; 1], tokio::io::util::write_all::WriteAll<'t7, ore::netio::SniffedStream<tokio::net::TcpStream>>, &'t8 openssl::ssl::SslAcceptor, impl futures::Future, tokio::io::util::write_all::WriteAll<'t10, pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>}]` = note: required because it appears within the type `std::future::from_generator::GenFuture<[static generator@pgwire::Server::handle_connection::#0 0:&pgwire::Server, 1:ore::netio::SniffedStream<tokio::net::TcpStream> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10> {std::future::ResumeTy, &'r pgwire::Server, ore::netio::SniffedStream<tokio::net::TcpStream>, pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>, &'s mut pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>, impl futures::Future, (), std::result::Result<pgwire::message::FrontendStartupMessage, std::io::Error>, pgwire::message::FrontendStartupMessage, i32, std::vec::Vec<(std::string::String, std::string::String)>, u32, coord::SessionClient, pgwire::protocol::StateMachine<pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>, impl futures::Future, bool, pgwire::Server, &'t1 coord::Client, coord::Client, &'t2 mut coord::Client, impl futures::Future, &'t4 mut ore::netio::SniffedStream<tokio::net::TcpStream>, u8, [u8; 1], &'t5 [u8], &'t6 [u8; 1], tokio::io::util::write_all::WriteAll<'t7, ore::netio::SniffedStream<tokio::net::TcpStream>>, &'t8 openssl::ssl::SslAcceptor, impl futures::Future, tokio::io::util::write_all::WriteAll<'t10, pgwire::server::Conn<ore::netio::SniffedStream<tokio::net::TcpStream>>>}]>` = note: required because it appears within the type `impl futures::Future` = note: required because it appears within the type `impl futures::Future` = note: required because it appears within the type `for<'r, 's> {std::future::ResumeTy, &'r pgwire::Server, ore::netio::SniffedStream<tokio::net::TcpStream>, impl futures::Future, ()}` = note: required because it appears within the type `[static generator@src/materialized/src/mux.rs:138:100: 140:6 _self:&pgwire::Server, conn:ore::netio::SniffedStream<tokio::net::TcpStream> for<'r, 's> {std::future::ResumeTy, &'r pgwire::Server, ore::netio::SniffedStream<tokio::net::TcpStream>, impl futures::Future, ()}]` = note: required because it appears within the type `std::future::from_generator::GenFuture<[static generator@src/materialized/src/mux.rs:138:100: 140:6 _self:&pgwire::Server, conn:ore::netio::SniffedStream<tokio::net::TcpStream> for<'r, 's> {std::future::ResumeTy, &'r pgwire::Server, ore::netio::SniffedStream<tokio::net::TcpStream>, impl futures::Future, ()}]>` = note: required because it appears within the type `impl futures::Future` = note: required because it appears within the type `impl futures::Future` = note: required for the cast to the object type `dyn futures::Future<Output = std::result::Result<(), anyhow::Error>> + std::marker::Send`
Good luck with that, eh?
This issue exists to collect links to blog posts that seem like status quo story ideas. Please provide the link and whatever other details you can that might help people to know what it's about.
This issue is not an end point! It's more of a "work queue". The idea is for folks to read the blog posts and identify potential status quo user stories and then open up fresh issues with those stories.
Please check the box next to a blog post link (and leave a comment) if you have done that!
Suggested format for each comment: