hyperium / hyper

An HTTP library for Rust
https://hyper.rs
MIT License
14.48k stars 1.6k forks source link

The design of the Futures-based Body Stream #953

Closed seanmonstar closed 7 years ago

seanmonstar commented 7 years ago

Currently exists as a futures::Stream of http::Chunk. The idea of Chunk is to keep the internals private, to allow for optimizations to happen without breaking changes, and then add From implementations that users can take advantage of.

Besides Vec<u8>, it will also likely have a form of AppendBuf, which will allow the sending slices from the same buffer when reading in a message body, instead of allocating an arbitrarily sized Vec, reading, and then shrinking.

It could also want some way to prevent double copies, as much as possible, when a user is creating Chunks.

/cc #934


Thanks to discussion below, here's the action items to complete this:

pimeys commented 7 years ago

Could you open a bit how this is supposed to work currently? I'm trying switch to the hyper HEAD and having a problem reading the body with this code:

let body_f = response.body().fold(body_vec, |mut acc, chunk| {
    acc.extend_from_slice(chunk.as_ref());
    Ok(acc)
}).map_err(|_| { response::FcmError::ServerError(None) });

Where compiler gives me:

error[E0283]: type annotations required: cannot resolve `hyper::Error: std::convert::From<_>`
   |
82 |             let body_f = response.body().fold(body_vec, |mut acc, chunk| {
   |                                          ^^^^

Now, is there some good practice how to read the body to a vector without blocking?

seanmonstar commented 7 years ago

I have hit this exact same error when I was writing tests. I'll check in with the tokio team about the error.

I also hope to write some guides explaining how to stream a body in hyper.

alexcrichton commented 7 years ago

@pimeys I think the error you're seeing there is just due to type inference in the compiler and the Stream trait. Right now fold is generic over the error returned, working with From to switch error types if necessary.

The error in your example I believe is that Ok(acc) doesn't have a known error type and the compiler is unable to infer what it should be. You can fix this with Ok::<_, hyper::Error>(acc) (eew) for now and we may want to consider making fold less generic in the future.

pimeys commented 7 years ago

Pretty much got it working and got my first proper use case with hyper-tokio client to compile:

https://github.com/pimeys/fcm-rust/blob/tokio/src/client/mod.rs#L82

I'm trying to write an asynchronious google push notification client with Tokio. I needed to modify Hyper a bit to to be able to modify headers and have access to some internal things which I think I should be able to refactor at some point.

But god damn, it works. Now I need to get this working with HEAD, get it fast and get rid of that Handle passing in the constructor, as noted here https://github.com/hyperium/hyper/issues/1002.

But first, I want to have this working on our use case, and we will have a handle to use :)

seanmonstar commented 7 years ago

@pimeys great! I'm curious about what modifications you need to make, if you could open separate issues about them, so can we keep the focus here of the design of the Body stream (and Chunk).

seanmonstar commented 7 years ago

So, the Chunk has had 2 variants added to it internally.

Does it make sense to also integrate with tokio's EasyBuf? Maybe that would replace the Arc<Vec<u8>> variant of point 2? And it might make it easier to integrate with other tokio components?

Note that tokio's EasyBuf currently isn't suitable for internal use in hyper, but maybe future updates will let it be so.

scottlamb commented 7 years ago

This API already will make it possible to avoid copying in several cases, but I'd prefer it use the owning_ref crate. That would let callers can supply anything that can give them a &[u8] owned in any of the usual ways. That is, replace this:

enum Inner {
    Owned(Vec<u8>),
    Referenced(Arc<Vec<u8>>),
    Mem(MemSlice),
    Static(&'static [u8]),
    // ...possibly EasyBuf as mentioned in the previous comment...
}

with this:

enum Inner {
    Static(&'static [u8]),
    Vec(owning_ref::VecRef<[u8]>),
    Box(owning_ref::BoxRef<[u8]>),
    Arc(owning_ref::ArcRef<[u8]>),
    // ...I also mentioned owning_ref::RcRef in #934 but now think it wouldn't actually work...
}

and add From implementations that accept the respective owning_refs. All of the existing Froms could be matched with the new Inner.

Here are a couple examples of how I'd use the extra flexibility in a real program (which does HTTP range serving of .mp4 files with the actual frames of video taken from files on disk and everything else from a SQLite database by dividing up into a bunch of "slices" of the file's byte positions generated in various ways):

(I think my use case stresses this API more than most; I'd be happy to tell you more about it if you're interested.)

The only other improvement I can think of would be if the stuff that needs to reference a per-request object could avoid reference counting by the chunks having some per-request lifetime bound. But I haven't done any benchmarking to see if an atomic reference count on each chunk is a noticeable performance impact in my program (or any realistic scenario, for that matter), and I'm not even sure what I'm vaguely describing is possible in safe Rust code.

seanmonstar commented 7 years ago

@scottlamb I tried looking through the owning_ref crate, and had a hard time understanding it's purpose.

As for memmaped slices, that sounds like the possibility of a bad time, if the memory is on disk and not in RAM when hyper tries to deref the slice to write into the socket. That blocking file IO could really hurt the performance of the event loop. I'm also not sure how to ask for an async deref, such that the loading could be done on another thread. It sounds like it would complicate a lot of things.

scottlamb commented 7 years ago

As for memmaped slices, that sounds like the possibility of a bad time, if the memory is on disk and not in RAM when hyper tries to deref the slice to write into the socket. That blocking file IO could really hurt the performance of the event loop. I'm also not sure how to ask for an async deref, such that the loading could be done on another thread. It sounds like it would complicate a lot of things.

Okay. I'm not too surprised to hear you say that. I was thinking about it; instead of controlling which thread accesses the chunk, maybe I can get the same effect by calling mlock from a thread I choose before giving you the chunk. I probably want to use smaller chunks then (so I can send the first byte of the file before the last byte is read), and I have to be more careful about how much I have in flight to avoid running out of memory, so it's more complexity at the application layer, but I think it's possible and maybe even simpler overall.

Anyway, if the mlock approach doesn't work out and I end up wanting help from hyper with controlling the thread that accesses the memory, I'll open a separate issue as it's clearly a whole other thing to consider.

I tried looking through the owning_ref crate, and had a hard time understanding it's purpose.

I just worked through an example which I think will make this clearer to both of us. (It showed me that I had the wrong Inner definition above which probably wasn't helping.)

Here I'm using it for the mmap case. I'm starting with a Box<memmap::Mmap>, turning that into a BoxRef<memmap::Mmap, memmap::Mmap>, using the map member function to turn that into a BoxRef<memmap::Mmap, [u8]>, and then using the erase_owner member function to turn that into an ErasedBoxRef<[u8]>. The Chunk can keep around an ErasedBoxRef<[u8]> and know that it has a [u8] it can keep as long as its wants. When it drops that reference, the Box is dropped, and the munmap call happens along the way. (I verified with the strace utility that the munmap call actually happens as hoped.)

extern crate owning_ref;
extern crate libc;
extern crate memmap;

use std::io;
use std::fs::File;

pub struct Chunk(Inner);

enum Inner {
    Static(&'static [u8]),
    Vec(owning_ref::VecRef<u8, [u8]>),
    Box(owning_ref::ErasedBoxRef<[u8]>),
    Arc(owning_ref::ErasedArcRef<[u8]>),
}

impl<T: 'static> From<owning_ref::BoxRef<T, [u8]>> for Chunk {
    #[inline]
    fn from(r: owning_ref::BoxRef<T, [u8]>) -> Chunk {
        Chunk(Inner::Box(r.erase_owner()))
    }
}

fn main() {
    let f = File::open("f").unwrap();
    let mmap = Box::new(
    memmap::Mmap::open(&f, memmap::Protection::Read).unwrap());
    if unsafe { libc::mlock(mmap.ptr() as *const libc::c_void, mmap.len()) } < 0 {
        panic!("{}", io::Error::last_os_error());
    }
    let mmap = owning_ref::BoxRef::new(mmap);
    let mmap = mmap.map(|m| unsafe { m.as_slice() });
    let chunk: Chunk = mmap.into();
}

Likewise, when I want to trim a vector before sending it out on the wire, I can stuff it into a VecRef<u8, [u8]> and then use map to get a subslice to put into the chunk. The chunk owns the whole vector but only sees part of its contents.

scottlamb commented 7 years ago

I have a dumb question orthogonal to our owning_ref discussion: how do I send more than one Chunk? Is there an example somewhere?

In particular, a Service returns a Future<Item=hyper::server::Response, Error=hyper::Error>. If I have just one Chunk, I can call set_body(chunk) on the Response before returning it. But if I have more than one Chunk, that deadlocks. (The TokioBody has a mpsc::channel(0), which only allows the one Sender to have one value in flight.) So I need to send the chunks after the future is resolved. So I need to call ::tokio_core::Handle::spawn myself with the future, I think? But how do I get access to the Handle from my Service? My best guess so far is to stash away server.handle().remote() into a lazy_static from main, then use Remote::spawn from my Service to get a handle to launch futures. That doesn't seem ideal...

seanmonstar commented 7 years ago

I think I understand now how owning_ref works. Essentially, having b"Hello World".to_vec(), but wanting to send a chunk that only references the bytes of b"World", right?

I get the appeal of that, and actually find that pretty neat. At the same time, I'm wary of linking hyper's public API to owning_ref, if for no other reason than I feel like owning_ref is an obscure crate. Maybe that feeling is baseless, or maybe there's another way to get a similar affect without relying on that crate?


Regarding sending multiple chunks, it assumes you don't have the multiple chunks ready yet to send (I would expect that normally someone has the bytes bundled into a single chunk). So, you must spawn some sort of other task that will try to send more chunks into the body as they become available, and the body has room. This could be a CPU task from futures-cpupool, or some other task in tokio (such as another network request).

If doing that feels bad, we should definitely try to find how to make that better. That'd partly be up to the design of tokio itself, and so possibly people would have input on the tokio-proto repo or the tokio gitter room, but it's also something that we can explore (and push the feedback in to tokio). We can ping Alex or Carl if we have specific feedback, too.

scottlamb commented 7 years ago

I think I understand now how owning_ref works. Essentially, having b"Hello World".to_vec(), but wanting to send a chunk that only references the bytes of b"World", right?

Yeah, that's among the things you can do with it, via something like this:

let v = owning_ref::VecRef(b"Hello World".to_vec()).map(|v| &v[6..]);
let chunk: Chunk = v.into();

At the same time, I'm wary of linking hyper's public API to owning_ref, if for no other reason than I feel like owning_ref is an obscure crate. Maybe that feeling is baseless, or maybe there's another way to get a similar affect without relying on that crate?

I dunno what to say about that feeling—on the one hand, it has an order of magnitude fewer downloads than hyper, on the other it's a relatively small amount of code and has been maintained for the past 18 months.

For the b"Hello World" -> b"World" case, it looks like there's a ::tokio_core::io::Window that does something similar. But I don't think it does the type erasure thing, so I don't know how I'd use it for the mmap stuff. And I think you'd need basically a non-window + window version of each of the current enum variants.

So, you must spawn some sort of other task that will try to send more chunks into the body as they become available, and the body has room.

How do I do that? I think I need a ::tokio_core::Handle and don't know where to get one easily. Maybe I'm just asking for it to be given as a parameter to NewService::new_service. Or maybe what I want is already easy and I'm missing it; I'm totally new to tokio...

theduke commented 7 years ago

Body should really have a convenience method, maybe fold_to_vec or collect_to_vec that yields a Future<Item=Vec<u8>, Error=hyper::Error>.

scottlamb commented 7 years ago

fwiw, I think that's a one-liner already (untested): body.fold(Vec::new(), |a, b| { a.extend_from_slice(&b[..]); futures::ok(a) })

pimeys commented 7 years ago

I've tested the folding and it works. Although it was not very obvious, good documentation would help here.

abonander commented 7 years ago

@seanmonstar

I'm working out a plan on how to build a fork of multipart on top of the async API once the dust settles. For the server-side API, I'm thinking of having a Multipart master struct which then becomes a stream of MultipartField, then each MultipartField is a stream of Chunk.

Of course, the main problem that multipart solves is deliminating on the multipart boundary. What I want to do is yield Chunks for each field until the boundary is hit, and then yield the data in that Chunk up to the boundary. If the byte subsequence turns out not to be the multipart boundary (such as if it was split between chunks), it needs to be yielded as well. I would prefer to do this without copying the data in the chunks and creating the new ones, if possible.

I see two potential solutions:

seanmonstar commented 7 years ago

So, something that I think solves both @abonander and @scottlamb's cases, plus various others, is if the Chunk were basically Bytes. It has several benefits:

A few questions remain regarding using Bytes in hyper:

scottlamb commented 7 years ago

Hmm, I think that would give me the ability to subset a Chunk, but not to have it backed by something more complex than a Vec<u8> as in the mmap example I gave above.

seanmonstar commented 7 years ago

@scottlamb you're right, I forgot about mmap. Asking in the tokio room, the suggested advice was basically "don't do that", because of the pauses in the event loop that can occur from loading the memory from disk. With those warnings, do you still find it compelling to try to do it anyways?

scottlamb commented 7 years ago

mlock from a worker thread seems like it would address the pause concern. It requires a thread handoff for each chunk, but given that no one actually uses aio_read, so does any approach to filesystem I/O other than doing writes from non-reactor threads or having enough reactor threads (with a shared event loop and/or work stealing) to not care if one blocks on disk, and I gather you and the tokio folks are looking to avoid those.

I should answer your question about if this is worthwhile with a benchmark. I'll try to work one up, but I apologize in advance for slowness: I have a tiny amount of time for my Rust projects every week and have been spreading that kinda thin, so nothing I do goes fast. I did look at it for a moment tonight and realized a problem with using owning_ref::ErasedBoxRef<[u8]>: that type isn't Send. D'oh.

Intuitively, it seems a little silly IMHO to go to the effort of the switch from sync to async (with all the pain of a non-intuitive programming model, the danger of accidentally doing slow/blocking stuff on the wrong thread, etc), in the name of performance (I think that was the main goal?) and then have to do context switches between every read and write and do extra copies. But maybe it comes down to a choice about what's important to optimize: small requests or large requests, serving from the filesystem or from elsewhere, etc. When I mention using mmap and avoiding copies, I'm thinking about filesystem-based responses much larger than the CPU cache.

abonander commented 7 years ago

What about posix_memadvise and PrefetchVirtualMemory to advise the OS to page-in the mmap'ed region? You could issue that call and then schedule an event for next idle to actually attempt the access, which will hopefully give the OS enough time to perform the operation, or cause the least disruption in case a page fault still happens. The latter call is restricted to Windows 8 or later, I don't know if you'd want to do the threading thing for previous versions or just dereference at idle (or immediately) and eat the cost.

abonander commented 7 years ago

I just realized we're talking about serving files. You can also use posix_fadvise() or set the FILE_FLAG_SEQUENTIAL_SCAN flag when opening the file on Windows, which will advise the OS to pre-cache the file. You could again try to read only on idle so as to limit disruption if the OS can't read ahead fast enough. Reading in chunks the size of a block in the filesystem would also help.

Addendum: Linux also has readahead() which immediately initiates a read-ahead into cache (though probably equivalent to posix_fadvise(WILL_NEED) for the same arguments), however the docs say that it may block to read file metadata. If the file descriptor was recently opened, though, the metadata should still be in-cache anyway.

scottlamb commented 7 years ago

I brought up mlock because it makes a guarantee: on successful return, the memory is paged in and will stay that way until munlock or munmap is called. So if you call it from a worker thread and then pass the reference to the reactor thread, you can be absolutely confident accessing that memory won't cause the reactor to pause.

posix_fadvise, posix_madvise, and readahead don't do that. They might do nothing. Even if they read the file into RAM, they don't tell you when it's done, and they don't guarantee it won't be immediately undone. Calls to read (for traditional IO) or memory accesses (for mmap) can still pause.

scottlamb commented 7 years ago

I've done my homework now, so I can answer @seanmonstar's question:

@scottlamb you're right, I forgot about mmap. Asking in the tokio room, the suggested advice was basically "don't do that", because of the pauses in the event loop that can occur from loading the memory from disk. With those warnings, do you still find it compelling to try to do it anyways?

Yes. It's compelling.

I hacked together a benchmark on this branch which serves a file in eight modes:

I created a 1 MiB file (dd if=/dev/zero of=f bs=1M count=1) and added the line * - memlock unlimited to /etc/security/limits.conf so mlock would work.

Then I ran it, restricting to a single core:

$ taskset --cpu-list 0 cargo run --release --example server

and used wrk to benchmark, giving it the other cores:

$ for i in {inline,threaded}-{direct,copy}-{read,memmap}; do taskset --cpu-list 1-3 wrk http://localhost:1337/$i; done

(I also ran each in a longer mode, used top to verify it was CPU-bound, and glanced at a few CPU profiles gathered with perf record -e cycles -p $(pidof server) --callgraph=dwarf, viewed via perf report -g --children, and didn't see any huge surprises.)

Here are the results on my laptop:

If I were being rigorous, I would run it several times and calculate error bounds, and I would do it directly on hardware (I'm using Ubuntu 15.04.1 LTS on VMware Fusion on a new Macbook Pro now), but it's not even close. Direct mmap is much faster than with a copy or using read.

(Also, avoiding thread handoffs seems to be a noticeable difference. I'm a little surprised; I thought they'd be insignificant with such large responses.)

On this machine, one core can easily saturate 1000Mbit/sec Ethernet in the slowest mode so there's no problem anyway, but the software I'm writing is intended to run on cheap ARM hardware where it matters more.

seanmonstar commented 7 years ago

@scottlamb That is some really impressive homework, thank you so much!

It definitely looks like mmap has some things going for it. I'm sure at some point in the wild you may see adverse affects, especially as more variables occur, such as larger files, and many different ones, data not in disk cache, the disk has to seek, the disk just hangs for a long time. But, I'm certainly not wanting to prevent someone from trying to make that work.

I realized that I had a misunderstanding on the user-supplied stream type that was required by tokio. I assumed it had to be tokio's Body type. But it can be generic over Stream. And if doing that, it may as well also be considered to allow the chunk portion of the stream to be generic as well.

So, this would mean changes like this to hyper:

struct Response<T> {
    body: T,
    // ...
}

impl<T> Response<T> where T: Stream, T::Item: AsRef<[u8]> {
    pub fn set_body(&mut self, body: T) {
      // ...
    }
}

And then, for @scottlamb's case:

struct YourService;

impl Service for YourService {
    type Response = Response<Body<MmapSlice>>;
    // other types
    fn call(&self, req: Self::Request) -> Self::Future {
        ok(Response::new().with_body(Body::from(make_mmap_slice())))
    }
}

You would just implement AsRef<[u8]> for your type, and then you wouldn't need to box and erase specifically, maybe you'd need an Arc or something, to send the pointer and some indices, really anything you wanted.

With the Body being a generic Stream, you also don't need to use the futures::sync::mpsc channel that backs Body, you could use something else, like futures::stream::Once (though, the Body hyper use's does have an optimization for only 1 item).

The only concern that I haven't toyed out all the way is that this may cause more work for people who don't need it. It should still be really easy to just send data, like a Vec<u8> or &'static [u8], or Bytes. Maybe this is done by defaulting to the generic, like Response<T = Body<Chunk>>. The trade off there, of course, is that people can accidentally not be generic very easily, by making libraries that accept Response instead of Response<T>...

scottlamb commented 7 years ago

Thanks. Glad you liked the benchmark.

Your newest proposal sounds great to me—I'd have all the flexibility I was looking for with owning_ref, without hyper needing to depend on that crate, and potentially without some overhead of boxing and match or trait dispatch.

I agree some libraries probably will accidentally take away that flexibility, but I don't know if any of the other ways we discussed would prevent that anyway. Rocket decided with the hyper 0.10.x to design its API to require a Read instead of matching hyper's Write, so it requires an extra io::copy between. I don't know what hyper could have done to anticipate or avoid that. I think there's no answer but to be careful at every added layer...

I also agree there are caveats to mmap, btw. A big one is that if the file gets truncated while you're reading it, your program will crash with SIGBUS. It's not a concern for my application (which runs as its own user and is the only thing that can read or write the files it's serving) but it means read is a better default.

ghost commented 7 years ago

I'd love an option to get the raw socket behind it, as there's times where you want to avoid userland completely, and just use a splice() or a sendfile().

Right now, this doesn't seem doable, or did I miss something during my experiments? (Granted, this is not really asynchronous, but this is still a very real use-case)

seanmonstar commented 7 years ago

@michael-zapata There is not an option to do that for now. The ability to borrow the socket would probably require some coordination with tokio, since there is tokio-proto types in-between the user and the socket.

It may be slightly easier to instead allow passing some struct Sendfile { path: PathBuf } on the Body stream, and having hyper figure out how to do that internally, but I'm also quite wary of that. sendfile can block the reactor thread when reading the file, and it is incompatible with HTTPS, and more of a pain if trying to work with HTTP2 streams. I'm not absolutely against it, but it'd require a bit more exploration and a prototype, and so something that feels out of scope for the next release.

seanmonstar commented 7 years ago

I've worked on making the user-supplied body stream generic, and it exists for both the server and client in this branch: https://github.com/hyperium/hyper/compare/outgoing-generic

A few things I either need to fiddle with, or would welcome feedback on, are:

abonander commented 7 years ago

I don't think it's worth worrying about library writers shooting themselves in the foot by falling back on the default type param.

It's pretty clear to me that Client::get() should be stuck in its own impl block with the B param as a concrete type:

impl<C> Client<C, hyper::Body> where C: Connect {
    /// Send a GET Request using this Client.
    #[inline]
    pub fn get(&self, url: Url) -> FutureResponse {
        self.request(Request::new(Method::Get, url))
    }
}

(Or alternately, futures::stream::Empty<[u8; 0], ::Error> as a true no-op stream).

I doubt you'll get many users who are too lazy to write client.request(Request::new(Method::Get, url)) but still want to use their own body stream for the same Client.

However, this leads into a separate concern I have, but one that I don't think will be easy to resolve because it also digs into Tokio a good bit: I don't think Client should be parametric over the body stream, because that prohibits using different stream types with the same Client. If possible, the B parameter should be lowered to the request() method. I do understand that the parts of Tokio that Client interacts with are dependent on the specific stream type, which does make for a bit of a conundrum. I can't help the feeling that this requirement could be lifted somewhat with some more tinkering in Tokio.

scottlamb commented 7 years ago

@seanmonstar wrote:

I haven't explored it yet, but it might be nice if the supplied body stream didn't need to actually emit hyper::Error, but just E: Into. That'd mean that io::Error would just work...

Maybe a dumb question, but what is this error used for? I think it's basically just for hyper to know that it should drop the connection to indicate error to the HTTP peer? I think even Error = () would suffice for that. Given that applications/libraries are the ones producing the error, they can do logging and such themselves in whatever way they desire.

hyper::Error is kind of weird for this in that its documentation states that it's "A set of errors that can occur parsing HTTP streams.", which is way more specific than "any sort of error an application could encounter that prevents it from fully generating a HTTP stream".

(By the way, there probably should be some way for hyper::server to tell the application if the whole response including the last chunk was fully written or not, but I think that's a separate concern from the body stream.)

@abonander wrote:

I don't think Client should be parametric over the body stream, because that prohibits using different stream types with the same Client.

I think the server side is actually similar: there's only one Service, and so only one definition of B. If frameworks (such as Iron, Rocket, etc) want to define any higher-level stuff (such as having an option to build simple synchronous handlers that use io::Write as in hyper 0.10.x), they'll end up having to state an opinion about what B should be. (Hopefully they'll leave an escape hatch such as an enum variant for custom stuff like what I want to do with mmap.) It'd be nice if it were possible to avoid forcing them to make this choice, but I think it's still significantly better than hyper being too rigid. It seems like the Rust web ecosystem has almost everything based on hyper, and then various frameworks on top of that. This hyper interface gives them the flexibility to experiment, and low-level performance-oriented code can avoid using a framework entirely and still not have to reinvent hyper.

abonander commented 7 years ago

This kinda goes for Service as well, I think the trait shouldn't be tied to the specific type of the byte stream, but I don't really have a good idea on how to fix that. I just know that different endpoints are likely going to want to return responses in different ways and the current trait design is going to require some kind of dynamic workaround at the user level, either boxing and type erasure or the use of enum, both of which have their associated costs.

seanmonstar commented 7 years ago

Maybe a dumb question, but what is this error used for?

Huh. I traced through the code in tokio and played with some examples, and found that a user will never receive the hyper::Error. If there is an error parsing the head, the error passed back to tokio will make tokio just kill the stream. An error can occur streaming the body, but realistically it'd only ever be an io::Error, until HTTP2 is added, in which case HTTP2 frame errors are also possible.

The purpose of requiring it on the Response side is currently a limited in tokio: you can only define 1 error type to be used through out.

I know from conversations that the tokio team want to improve on error handling in newer versions, so maybe some of these points go away. For the time being, though, I wonder if it makes sense to just be an io::Error. Or really, some opaque error type that can From::from(io::Error), so someone can get the description of the error message. There isn't anything else you can do once the Request stream has sent an error; it will be shutdown regardless.


The points about the Service trait are very interesting! I'd recommend either opening an issue on the tokio-service repo, or discussing them out in the tokio gitter room.

seanmonstar commented 7 years ago

Actually, thanks to another coversation in the tokio room, I realized that it is possible for users make use of the hyper::Error, such as knowing when there was a parse error before receiving a Request.

It is possible to wrap the Http protocol, with another ServerProto impl. It could probably even be a generic wrapper. It would instead find Frame::Error and translate them in a Frame::Message with a Result<Request, Error>. You could then use a new Service to handle that, like so:

struct Example;

impl Service for Example {
    type Request = Result<hyper::server::Request, hyper::Error>;
    // other types are the same
    fn call(&self, incoming: Self::Request) -> Self::Future {
        match incoming {
            Ok(req) => {

            },
            Err(parse_error) => {
                // you could check for some error types
                // like Error::TooLarge, you could respond with a 414
            },
        }
    }
}

It's possibly worth wondering if hyper::server::Http should do this itself, and the wrapper could chose to ignore parse errors, or the other way around. Probably also worth being in a separate issue.

abonander commented 7 years ago

@seanmonstar I mean, I guess if there's no type erasure going on in a deeper abstraction layer that this could be rolled into somehow then the point is moot.

alexcrichton commented 7 years ago

cc https://github.com/alexcrichton/futures-rs/issues/390, a possible extension that came up during integration in https://github.com/mozilla/sccache/pull/70

mjc-gh commented 7 years ago

I'm in the process of blogging about futures and the changes to hyper for my company's blog. I have some questions about streaming responses with hyper. I put together a small example that just creates an infinite stream and sends out a Chunk periodically via an Interval from tokio-timer. The source for this is available here: https://github.com/Sigient/hyper_examples/blob/future-streaming/src/main.rs

Firstly, is this the proper way to handle this sort of streaming? I want to eventually rewrite esper and will be keeping open a connection for an Event Source stream so I can send data as events are received from other clients. Is there a better way to send a stream of Chunks or, more generally, tie an arbitrary Stream to a response body? I'm thinking of implementing the Stream trait for some message queue type in esper and defining a poll function which knows when there are new messages available. I'm still trying to wrap my head around futures before I attempt implementing something this complex though.

Secondly, when I run this example and connect a client to the stream, I see the CPU usage for the process spikes to 100%. I think this is likely because I am doing something wrong with the Interval type but I'm not really sure...

seanmonstar commented 7 years ago

@mikeycgto The link to the example doesn't resolve, so I'm able to comment on most of your questions...

mjc-gh commented 7 years ago

Sorry about that @seanmonstar! The repo was private and I have now made it public. Thanks!

seanmonstar commented 7 years ago

@mikeycgto:

Firstly, is this the proper way to handle this sort of streaming?

That's certainly a way to do it.

Is there a better way to send a stream of Chunks or, more generally, tie an arbitrary Stream to a response body?

I don't know about a 'better' way, but the outgoing (user-supplied) body stream is generic, so you can certainly create your own Stream type. It looks like this example is already making use of that, since before you had to send a hyper::Body body, but this is sending a Box<Stream>.

I see the CPU usage for the process spikes to 100%. I think this is likely because I am doing something wrong with the Interval type but I'm not really sure...

I'm not familiar at all with how the tokio-timer crate works. To try to locate where the CPU usage is coming from, you could compare with different kind of infinite stream. A naive implementation could spawn a thread, use let (tx, body) = hyper::Body::pair(), and then in the thread loop { tx.send(chunk); sleep_thread_for_2_seconds(); }.

scottlamb commented 7 years ago

@mikeycgto, I wonder if your busy-looping is a bug in tokio or mio. I'm also seeing terrible performance with somewhat different code (a CpuPool sending chunks over a futures::sync::mpsc::channel), and just filed a bug about it in tokio:

https://github.com/tokio-rs/tokio-core/issues/177

I don't understand the problem well enough to know if it's related to what you're seeing or not.

seanmonstar commented 7 years ago

@scottlamb @mikeycgto the busy-looping may have been in part due to code in hyper, which was adjusted in #1074. Still seems to me that tokio shouldn't try to flush if it didn't write anything, but oh well.

mjc-gh commented 7 years ago

No longer seeing the busy looping. Thanks Sean!

seanmonstar commented 7 years ago

hyper now uses the bytes crate, and so there is an impl From<Bytes> for Chunk, and related shortcuts for set_body. That might be the end of this issue...

I do wonder if the Chunk type has any value anymore, or if the default body stream should just use Bytes directly.

seanmonstar commented 7 years ago

I'm leaning towards closing this as fixed, unless there's an issue that hasn't been addressed yet.

abonander commented 7 years ago

Kind of a naive idea, but I was wondering if there could be some way to have a return stream of Bytes that have been completely read from so the same handles could be reused. This could be done entirely in client code by cloning each Bytes handle yielded and trying to convert them to BytesMut later, but that seems more complex and less efficient.

seanmonstar commented 7 years ago

@abonander If you keep hold of a BytesMut, you can try to reclaim it later by calling reserve on it. If there are no more existing references (Bytes) alive, it will just reset on the same buffer. It might be a nice feature for the bytes crate to allow some way of using a memory pool, or something.

In that case though, it's probably better for this stuff to make use of Drop and such (so implemented outside of hyper).

dekellum commented 6 years ago

For anyone arriving here by searching for hyper and mmap, the body_image master branch (candidate for body_image 0.4.0 release) now has zero-copy/async. support for memory mapped http bodies using (glibc) madvise(SEQUENTIAL) (for aggressive OS read-ahead), and tokio-threadpool blocking annotation.

Thanks for publishing your bench results, @scottlamb. I have some non-hyper specific tokio benchmarks included in body_image as well (see CHANGELOG.md for those results.) An mlock strategy could also be added there in the future. Another item that is currently lacking is MAP_HUGE(TLB|_2MB|_1GB) , for lack of support in the memmap-rs crate, which is ripe for forking since its passively (or is it passive-aggressively?) maintained.