softprops / hyperlocal

🔌 ✨rustlang hyper bindings for local unix domain sockets
MIT License
229 stars 46 forks source link

cannot use with tokio::run() #20

Closed njaard closed 1 month ago

njaard commented 5 years ago

There's no way to get Futures from incoming connections. Plain old hyper uses this kind of code:

let s = Server::bind(&addr.parse().unwrap())
  .serve(new_service)
  .map_err(|e| eprintln!("server error: {}", e));
hyper::rt::run(s);

But hyperlocal doesn't allow it. Even if you do into_future() (after mapping the outputs to ()), then it doesn't work because it never actually processes requests.

This feature is necessary because hyperlocal's event loop doesn't work at all with threadpooling and the tokio_threadpool::blocking function.

njaard commented 5 years ago

Here's a test program that doesn't respond to incoming requests.

You can connect to it like so: curl --unix-socket socket 'http://localhost:3000/'

extern crate hyper;
extern crate hyperlocal;
extern crate tokio_threadpool;
extern crate futures;

use hyper::{Body};
use std::sync::Arc;

type Response = hyper::Response<Body>;
type Request = hyper::Request<Body>;

use futures::future::{Future, lazy};
use tokio_threadpool::blocking;
use hyper::rt::poll_fn;
use futures::Stream;

fn main()
{
  let addr = "socket";
  let new_service =
    move ||
    {
      let pool = Arc::new(tokio_threadpool::ThreadPool::new());
      hyper::service::service_fn(
        move |_req: Request|
        {
          pool.spawn_handle(
            lazy(move ||
              poll_fn( move || blocking(
                || Response::new(Body::from("hello"))
              ))
            )
          )
        }
      )
    };

  let s = hyperlocal::server::Http::new().serve_path(
    addr, new_service
  ).unwrap().into_future().map(|_| ()).map_err(|e| panic!("{:?}" ,e.0));

  hyper::rt::run(s);
}
njaard commented 5 years ago

And here's a program that doesn't use a threadpool properly.

You can test this program with: for i in $(seq 1 20); do { curl --unix-socket socket 'http://localhost:3000/' & } ; done; time for i in $(seq 1 20); do wait; done

Expected behavior: this command completes in about 1s. Actual behavior: this command completes in about 20s.

extern crate hyper;
extern crate hyperlocal;
extern crate tokio_threadpool;
extern crate futures;

use hyper::{Body};
use std::sync::Arc;

type Response = hyper::Response<Body>;
type Request = hyper::Request<Body>;

use futures::future::{lazy};
use tokio_threadpool::blocking;
use hyper::rt::poll_fn;

fn main()
{
  let addr = "socket";
  let new_service =
    move ||
    {
      let pool = Arc::new(tokio_threadpool::ThreadPool::new());
      hyper::service::service_fn(
        move |_req: Request|
        {
          pool.spawn_handle(
            lazy(move ||
              poll_fn( move || blocking(
                ||
                {
                  std::thread::sleep_ms(1000);
                  Response::new(Body::from("hello"))
                }
              ))
            )
          )
        }
      )
    };

  let s = hyperlocal::server::Server::bind(addr, new_service)
    .unwrap();

  s.run().unwrap();
}
softprops commented 5 years ago

Thanks for reporting the issue @njaard. I'm open to pull requests if you have ideas for the api you'd like to see.

njaard commented 5 years ago

There's no way I could write you a patch, because my understanding of tokio is too poor.

I guess there should be an Api that provides a Future<(),()> for incoming connection, as that's what hyper::rt::run() wants.

softprops commented 5 years ago

That's cool. I can dig into this. Thanks for the sample code. I'll try an use this example code under the examples dir when I figure this out

softprops commented 1 month ago

many things have changed in between this report and today. feel free to reopen if this is still and issue