Bug with response using struct implementing bytes::Buf #3650

Open tom-kuchler opened 2 months ago

tom-kuchler commented 2 months ago

Version hyper: 1.3.1 hyper-util 0.1.3 bytes: 1.6.0 tokio: 1.37,0

Platform Linux LAPTOP-CSN0P74T

Summary For a project I'm building I wanted to provide serialization of some internal structures and send them out as responses. To do this I implemented a struct that implementes hyper::body::Body and implemented bytes::Buf for my internal structure. When sending the structure I found that parts of the Buf had been replaced by different data. Specifically I found that not all parts of the Buf were read and instead different data was inserted.

Minimal Viable Example Here is a minimal example that implements the Buf interface over a Vec but only returns single byte slices.

use std::{cmp::min, convert::Infallible, net::SocketAddr};

use hyper::{body::Frame, server::conn::http1, service::service_fn, Error, Request, Response};
use tokio::{net::TcpListener, spawn};

const TEST_MESSAGE: &str =
    "test message that needs to be a few bytes long for the sake of demonstration";

struct MinimalBytes {
    current: usize,
    data: Vec<u8>,

impl bytes::Buf for MinimalBytes {
    fn remaining(&self) -> usize {
        return self.data.len() - self.current;

    fn advance(&mut self, cnt: usize) {
        if self.current + cnt > self.data.len() {
            panic!("Attempted to advance past end of MinimalBytes");
        self.current += cnt;

    fn chunk(&self) -> &[u8] {
        let end = min(self.current + 1, self.data.len());
        return &self.data[self.current..end];

struct MinimalBody {
    frame: Option<MinimalBytes>,

impl hyper::body::Body for MinimalBody {
    type Data = MinimalBytes;
    type Error = Error;
    fn poll_frame(
        self: std::pin::Pin<&mut Self>,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
                .and_then(|min_bytes| Some(Ok(Frame::data(min_bytes)))),

async fn test_service(
    _: Request<hyper::body::Incoming>,
) -> Result<Response<MinimalBody>, Infallible> {
    return Ok(Response::new(MinimalBody {
        frame: Some(MinimalBytes {
            current: 0,
            data: String::from(TEST_MESSAGE).into_bytes(),

async fn main() {
    // start server
    let addr = SocketAddr::from(([0, 0, 0, 0], 8080));
    let listener = TcpListener::bind(addr).await.unwrap();
    loop {
        let (stream, _) = listener.accept().await.unwrap();

        let io = hyper_util::rt::TokioIo::new(stream);

        spawn(async move {
            if let Err(err) = http1::Builder::new()
                .serve_connection(io, service_fn(test_service))
                eprintln!("Error serving connection: {:?}", err);

What I would expect to happen is a response that contains the test message. What happens instead is that after every byte of that message it inserts the 7 bytes \r\n0\r\n\r\n and then advances past 7 bytes in the actual buffer without every asking for those chunks. I have checked it with a few different steps sizes, and it always seems to insert these specific 7 bytes if a chunk is returned that is not equal to the total size of remaining. If the chunk length is equal to remaining then everything works.

Let me know if there is some additional information I could provide or something I could easily look into to help fix this.

seanmonstar commented 2 months ago

Ummm yea, you're right. This seems busted. I'm working on a unit test case for this, and then see track down the cause. Thanks for reporting!

seanmonstar commented 2 months ago

OK OK, I see what's going on. It's the Buf::chunks_vectored() method.

I need to think through the problem a bit more, but I'll likely need to take this as an issue to the bytes repo.

seanmonstar commented 2 months ago

Issue filed upstream: https://github.com/tokio-rs/bytes/issues/701

tom-kuchler commented 1 month ago

Alright, the I'll follow that, thanks a lot for the swift response.