FlorianUekermann / rustls-acme

Apache License 2.0
136 stars 27 forks source link

Example for tokio/axum #27

Closed nicolaspernoud closed 2 years ago

nicolaspernoud commented 2 years ago

Hello,

I have been trying to adapt the tokio/axum tls example here : https://github.com/tokio-rs/axum/blob/main/examples/low-level-rustls/src/main.rs to use with rustls-acme, but I am stuck because the acceptor does not accept a stream, and since tokio does not have the incoming method on the tcp listener, i cannot extract the tcp object as in https://github.com/FlorianUekermann/rustls-acme/blob/8673b2ceeacc02dd8d0a4eea7420e25f08b76102/examples/server_low_level.rs#L62 .

Could you provide an example to be used with tokio based frameworks, ideally with axum ? (but it could be useful with actix too).

Thanks. Best regards.

FlorianUekermann commented 2 years ago

I tried to stay compatible with tokio, but if it turns out that I failed, I would be very much open to a change of the high- or low-level api of rustls-acme. I'm not familiar with axum, so I don't quite follow where the problem is yet. I'll be traveling for most of the next two weeks, so I'm not sure when I'll get around to testing this or providing an example.

A few hints that may help you:

  1. If you want to use the low level api, calling accept() method is roughly equivalent to my use of incoming().next().
  2. The tokio docs on TcpListener link to a stream wrapper TcpListenerStream, which should be compatible with the simpler high level api.

I'll happily accept PRs with examples for other runtimes and frameworks, preferably using the high-level api. If you can't get it to work with the hints above or if it is very awkward, please post your progress and I'll try to understand in more detail as soon as I have the time to sit down with the problem for a few hours.

nicolaspernoud commented 2 years ago

Hello,

Thanks for taking the time to consider the question. I tried to use the higher level API with TcpListenerStream... but it does not implement the AsyncRead and AsyncWrite traits, so that does not work. I will try to ask on the hyper channel in the tokio discord server, since the question is mostly tokio/hyper related, as it was advised on the following discussion : https://github.com/tokio-rs/axum/discussions/495 .

If I find anything that works, I would happily propose a pull request with the example. If not, I would gladly accept any future insights from your part, when you can find the time.

FlorianUekermann commented 2 years ago

TcpListenerStream doesn't need to implement AsyncRead+AsyncWrite. It is the tokio equivalent of Incoming (from the smol::net::TcpListener::incoming(&self) method). It is a stream of Item = Result<TcpStream, Error>. The TcpStream in the item does implement AsyncRead+AsyncWrite, so an instance of TcpListenerStream should be accepted by the high level api of rustls-acme in place of smol::net::TcpListener::incoming(&self): https://github.com/FlorianUekermann/rustls-acme/blob/8673b2ceeacc02dd8d0a4eea7420e25f08b76102/examples/server_simple.rs#L45

nicolaspernoud commented 2 years ago

Hello,

I must be missing something, but with the following code, starting from your repository : In cargo.toml :

tokio = { version= "1.19.2", features = ["full"]}
tokio-stream = { version= "0.1.9", features = ["net"] }

And in server_simple.rs :

̀use clap::Parser;
use futures::AsyncWriteExt;
use futures::StreamExt;
use rustls_acme::caches::DirCache;
use rustls_acme::AcmeConfig;
use std::path::PathBuf;
use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;

#[derive(Parser, Debug)]
struct Args {
    /// Domains
    #[clap(short, required = true)]
    domains: Vec<String>,

    /// Contact info
    #[clap(short)]
    email: Vec<String>,

    /// Cache directory
    #[clap(short, parse(from_os_str))]
    cache: Option<PathBuf>,

    /// Use Let's Encrypt production environment
    /// (see https://letsencrypt.org/docs/staging-environment/)
    #[clap(long)]
    prod: Option<bool>,

    #[clap(short, long, default_value = "443")]
    port: u16,
}

#[tokio::main]
async fn main() {
    simple_logger::init_with_level(log::Level::Info).unwrap();
    let args = Args::parse();

    let tcp_listener = TcpListener::bind(format!("[::]:{}", args.port))
        .await
        .unwrap();

    let tcp_listener = TcpListenerStream::new(tcp_listener);

    let mut tls_incoming = AcmeConfig::new(args.domains)
        .contact(args.email.iter().map(|e| format!("mailto:{}", e)))
        .cache_option(args.cache.clone().map(DirCache::new))
        .incoming(tcp_listener);

    while let Some(tls) = tls_incoming.next().await {
        let mut tls = tls.unwrap();
        tokio::spawn(async move {
            tls.write_all(HELLO).await.unwrap();
            tls.close().await.unwrap();
        });
    }
    unreachable!()
}

const HELLO: &'static [u8] = br#"HTTP/1.1 200 OK
Content-Length: 10
Content-Type: text/plain; charset=utf-8

Hello Tls!"#;

I get the following error with cargo run --example server_simple :

error[E0277]: the trait bound `tokio::net::TcpStream: futures::AsyncRead` is not satisfied
   --> examples/server_simple.rs:47:10
    |
47  |         .incoming(tcp_listener);
    |          ^^^^^^^^ the trait `futures::AsyncRead` is not implemented for `tokio::net::TcpStream`
    |
note: required by a bound in `AcmeConfig::<EC, EA>::incoming`
   --> /home/nicolas/dev/rustls-acme/src/config.rs:122:14
    |
122 |         TCP: AsyncRead + AsyncWrite + Unpin,
    |              ^^^^^^^^^ required by this bound in `AcmeConfig::<EC, EA>::incoming`

error[E0277]: the trait bound `tokio::net::TcpStream: futures::AsyncWrite` is not satisfied
   --> examples/server_simple.rs:47:10
    |
47  |         .incoming(tcp_listener);
    |          ^^^^^^^^ the trait `futures::AsyncWrite` is not implemented for `tokio::net::TcpStream`
    |
note: required by a bound in `AcmeConfig::<EC, EA>::incoming`
   --> /home/nicolas/dev/rustls-acme/src/config.rs:122:26
    |
122 |         TCP: AsyncRead + AsyncWrite + Unpin,
    |                          ^^^^^^^^^^ required by this bound in `AcmeConfig::<EC, EA>::incoming`

error[E0599]: the method `next` exists for struct `rustls_acme::Incoming<tokio::net::TcpStream, std::io::Error, TcpListenerStream, std::io::Error, std::io::Error>`, but its trait bounds were not satisfied
  --> examples/server_simple.rs:49:40
   |
49 |       while let Some(tls) = tls_incoming.next().await {
   |                                          ^^^^ method cannot be called on `rustls_acme::Incoming<tokio::net::TcpStream, std::io::Error, TcpListenerStream, std::io::Error, std::io::Error>` due to unsatisfied trait bounds
FlorianUekermann commented 2 years ago

Ah... right. The code looks good. I forgot that tokio has its own definition of AsyncRead. Very annoying. I don't remember how to deal with this, but I'll take a look soon as I'm in front of a computer (tomorrow at the earliest).

nicolaspernoud commented 2 years ago

Ok, tomorrow or whenever convenient is perfectly fine : thanks a lot !

FlorianUekermann commented 2 years ago

I added a low level api example for tokio: https://github.com/FlorianUekermann/rustls-acme/blob/main/examples/tokio_server_low_level.rs

I can't properly test this, because I'm not an a machine with a public api. Please let me know if you have trouble with it. Unfortunately the high level API is a little more tricky, because you need to call compat() on the item to implement the normal (non-tokio) AsyncRead+AsyncWrite, but the item is acquired under the hood inside rustls-acme.

There are a couple of possible solutions to this, but I'll need a little more time to think about how I want to do that.

FlorianUekermann commented 2 years ago

The normal futures crate has some really nice mapping methods for stream to map their results, so I was able to add a simple example without api changes: https://github.com/FlorianUekermann/rustls-acme/blob/main/examples/tokio_server_simple.rs

Again, this isn't properly tested, so please report any problems you have with it.

The TLS streams you get out in the examples is a normal AsyncRead+AsyncWrite, so if you need the tokio version you'll need to call the compat() method on that.

nicolaspernoud commented 2 years ago

Hello,

The good news is that it is working with both examples. I am still struggling with hyper/axum though : the app.make_service function that can be found in the example here wants an &AddrStream and not the Compat we have. I will try again tomorrow, to see if something can be worked out. Anyway, thanks a lot for that precious first step.

Best regards.

nicolaspernoud commented 2 years ago

Hello again,

That works ! I eventually managed to get it working with axum. It is maybe a little too specific to be made as an example, but for future reference, that is the working code below. Thanks a lot for your help, and your amazing crate !

In Cargo.toml

axum = "0.5"
futures-util = "0.3"
hyper = { version = "0.14", features = ["full"] }
tower = { version = "0.4", features = ["make"] }

In axum_server_low_level.rs

use async_rustls::rustls::Session;
use async_rustls::TlsAcceptor;
use axum::extract::ConnectInfo;
use axum::routing::get;
use axum::Router;
use clap::Parser;
use futures_util::future::poll_fn;
use hyper::server::accept::Accept;
use hyper::server::conn::AddrIncoming;
use hyper::server::conn::Http;
use rustls_acme::acme::ACME_TLS_ALPN_NAME;
use rustls_acme::caches::DirCache;
use rustls_acme::AcmeConfig;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::pin::Pin;
use tokio_stream::StreamExt;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::compat::TokioAsyncReadCompatExt;
use tower::MakeService;

#[derive(Parser, Debug)]
struct Args {
    /// Domains
    #[clap(short, required = true)]
    domains: Vec<String>,

    /// Contact info
    #[clap(short)]
    email: Vec<String>,

    /// Cache directory
    #[clap(short, parse(from_os_str))]
    cache: Option<PathBuf>,

    /// Use Let's Encrypt production environment
    /// (see https://letsencrypt.org/docs/staging-environment/)
    #[clap(long)]
    prod: Option<bool>,

    #[clap(short, long, default_value = "443")]
    port: u16,
}

#[tokio::main]
async fn main() {
    simple_logger::init_with_level(log::Level::Info).unwrap();
    let args = Args::parse();

    let mut state = AcmeConfig::new(args.domains)
        .contact(args.email.iter().map(|e| format!("mailto:{}", e)))
        .cache_option(args.cache.clone().map(DirCache::new))
        .state();
    let acceptor = state.acceptor();

    tokio::spawn(async move {
        loop {
            match state.next().await.unwrap() {
                Ok(ok) => log::info!("event: {:?}", ok),
                Err(err) => log::error!("error: {:?}", err),
            }
        }
    });
    serve(acceptor, args.port).await;
}

async fn serve(acceptor: TlsAcceptor, port: u16) {
    let listener = tokio::net::TcpListener::bind(format!("[::]:{}", port))
        .await
        .unwrap();
    let mut addr_incoming = AddrIncoming::from_listener(listener).unwrap();

    // Create axum application
    let mut app = Router::new()
        .route("/", get(handler))
        .into_make_service_with_connect_info::<SocketAddr>();

    loop {
        let stream = poll_fn(|cx| Pin::new(&mut addr_incoming).poll_accept(cx))
            .await
            .unwrap()
            .unwrap();
        let acceptor = acceptor.clone();

        let app = app.make_service(&stream).await.unwrap();

        tokio::spawn(async move {
            let tls = acceptor.accept(stream.compat()).await.unwrap().compat();
            match tls.get_ref().get_ref().1.get_alpn_protocol() {
                Some(ACME_TLS_ALPN_NAME) => log::info!("received TLS-ALPN-01 validation request"),
                _ => Http::new().serve_connection(tls, app).await.unwrap(),
            }
        });
    }
}

async fn handler(ConnectInfo(addr): ConnectInfo<SocketAddr>) -> String {
    format!("Hello Tls!\nYour address is: {addr}")
}
FlorianUekermann commented 2 years ago

I struggle following the problem a bit. Looking at the axum and tower docs it is unclear to me why an AddrStream is needed. But the type parameter bounds a bit complex here, so I'm sure I just missed it.

On a more fundamental level, I am a bit confused by the strict requirement for the underlying transport to be a TCP transport implied by the need for AddrStream. Surely there is a nice way to use some kind of TLS stream from some other crate with axum, tower and hyper, right? Would I need to implement axum::extract::connect_info::Connected to extract ConnectInfo for the returned transport stream or what's missing here to make things less awkward?

nicolaspernoud commented 2 years ago

Well, I do not know. I don't think the problem is with ConnectInfo but more with the api of make_service.

I tried to ask the axum creator here : https://github.com/tokio-rs/axum/discussions/495#discussioncomment-3107551; maybe he can propose something (?).

nicolaspernoud commented 2 years ago

By the way, if you were to implement something axum related, the best would be something like that : https://github.com/tokio-rs/axum/blob/bc55362ba8b19e0c1ad5260a68a250d09aeb8281/examples/tls-rustls/src/main.rs#L21

casey commented 2 years ago

I refactored the example into a TlsAcceptor struct that can be passed to axum_server::Server::acceptor:

use async_rustls::rustls::Session;
use clap::ArgGroup;
use futures::future::BoxFuture;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use rustls_acme::{acme::ACME_TLS_ALPN_NAME, caches::DirCache, AcmeConfig};
use std::marker::Unpin;
use tls_acceptor::TlsAcceptor;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_stream::StreamExt;
use tokio_util::compat::Compat;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio_util::compat::TokioAsyncReadCompatExt;

use super::*;

#[derive(Clone)]
pub(crate) struct TlsAcceptor(pub(crate) async_rustls::TlsAcceptor);

impl<I: AsyncRead + AsyncWrite + Unpin + Send + 'static, S: Send + 'static>
  axum_server::accept::Accept<I, S> for TlsAcceptor
{
  type Stream = Compat<async_rustls::server::TlsStream<Compat<I>>>;
  type Service = S;
  type Future = BoxFuture<'static, io::Result<(Self::Stream, Self::Service)>>;

  fn accept(&self, stream: I, service: S) -> Self::Future {
    self
      .0
      .accept(stream.compat())
      .map_ok(move |tls| {
        let tls = tls.compat();
        if let Some(ACME_TLS_ALPN_NAME) = tls.get_ref().get_ref().1.get_alpn_protocol() {
          log::info!("received TLS-ALPN-01 validation request");
        }
        (tls, service)
      })
      .boxed()
  }
}

The real challenge is wrapping things in compat adapters.

FlorianUekermann commented 2 years ago

Maybe we should add a a axum_acceptor method to AcmeState that constructs this. That gets us one step closer to an axum compatible variant of the simple API. Just have to figure out how to poll the AcmeState stream then.

casey commented 2 years ago

I think that would be a great idea. This code is pretty short, but it was extremely hard to get the types right, so it would be pretty helpful to provide it.

nicolaspernoud commented 2 years ago

That would be great !

FlorianUekermann commented 2 years ago

I'm working on using rustls 0.20 via futures-rustls instead of rustls 0.19 via async-rustls. Some changes to futures-rustls need to be done first, so I'll have to do that first. I'm reopening this issue, because I would like to add a proper axum example afterwards.

nicolaspernoud commented 2 years ago

Very nice, thanks a lot for your dedication. That would be great to ensure that rustls-acme and axum work together beautifully !

FlorianUekermann commented 2 years ago

I pushed what will be 0.4.0-beta1. Publication of the beta on crates.io is blocked by a pending PR in futures-rustls. rustls-acme = { git="https://github.com/FlorianUekermann/rustls-acme.git" } in Cargo.toml should do the trick. There is an axum example and feature included.

Feedback would be much appreciated, that's why I'm doing the beta first.

casey commented 2 years ago

Nice! I just opened a draft PR that replaces the acceptor I wrote with the one in 0.4.0-beta1. Looks like it works! I deployed it to a test server and it looks like it's working there too. If you have a moment to check out the PR, I'd appreciate it. I've never constructed a rustls::ServerConfig, so I want to make sure I'm not doing something wrong there.

FlorianUekermann commented 2 years ago

0.4.0-beta2 is on crates.io now, because futures-rustls merged my PR and published a new version as well.

@casey : The PR looks good to me. The only tricky thing about server side rustls configs is the resolver, which rustls-acme provides here. Thanks for testing. I would recommend to wait for the non-beta release (in a few days). I'll try to get some more feedback and testing in before that, so the API may change (although I'm like 90% certain it won't).

nicolaspernoud commented 2 years ago

I tested it in my use case as well (https://github.com/nicolaspernoud/vestirust/commit/1538a0fe476164a1950df7e2cf2f251447adeb4f), and it works very well. It allows to use the axum-server handle for graceful shutdown, that is a lot cleaner than what I did before. Thanks a lot for your very useful crate, and moreover for your helpfulness in making it easy to use with axum !

casey commented 2 years ago

I think this can be closed, since axum is directly supported using the axum feature.

nicolaspernoud commented 2 years ago

Yes, thanks a lot @FlorianUekermann for your very useful (and now easy to use with axum 😉 ) crate !