tokio-rs / tokio

A runtime for writing reliable asynchronous applications with Rust. Provides I/O, networking, scheduling, timers, ...
https://tokio.rs
MIT License
25.44k stars 2.3k forks source link

`tokio_util::io::StreamReader` uses incorrect generic parameter for `Sink` impl #6642

Closed eric-seppanen closed 2 weeks ago

eric-seppanen commented 3 weeks ago

Version

tokio v1.38.0
tokio-stream v0.1.15
tokio-util v0.7.11

Platform

Linux 6.2.0-39-generic #40~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Thu Nov 16 10:53:04 UTC 2 x86_64 x86_64 x86_64 GNU/Linux

Description

I think that the generic parameters are used incorrectly where tokio-util's StreamReader implements Sink:

https://github.com/tokio-rs/tokio/blob/8480a180e6ffdd0ec0ec213a9bebcda2445fc541/tokio-util/src/io/stream_reader.rs#L329-L330

It is written as though StreamReader's second parameter is its error type, which is not correct-- the second parameter is its buffer type.

This breaks the ability to use StreamReader and SinkWriter together. In this example the error message makes it clear that the types have become confused:

use std::io::{self, Cursor};

use futures_sink::Sink;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_stream::Stream;
use tokio_util::io::{SinkWriter, StreamReader};

pub struct Foo;

impl Foo {
    pub fn into_async_rw(self) -> impl AsyncRead + AsyncWrite {
        // This is broken because StreamReader doesn't correctly pass through the Sink impl types.
        SinkWriter::new(StreamReader::new(self))
    }
}

impl Stream for Foo {
    type Item = io::Result<Cursor<Vec<u8>>>;

    fn poll_next(
        self: std::pin::Pin<&mut Self>,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        todo!()
    }
}

impl<'a> Sink<&'a [u8]> for Foo {
    type Error = io::Error;

    fn poll_ready(
        self: std::pin::Pin<&mut Self>,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        todo!()
    }

    fn start_send(self: std::pin::Pin<&mut Self>, _item: &'a [u8]) -> Result<(), Self::Error> {
        todo!()
    }

    fn poll_flush(
        self: std::pin::Pin<&mut Self>,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        todo!()
    }

    fn poll_close(
        self: std::pin::Pin<&mut Self>,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        todo!()
    }
}
error[E0271]: type mismatch resolving `<Foo as Sink<&[u8]>>::Error == Cursor<Vec<u8>>`
  --> src/lib.rs:11:35
   |
11 |     pub fn into_async_rw(self) -> impl AsyncRead + AsyncWrite {
   |                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^ type mismatch resolving `<Foo as Sink<&[u8]>>::Error == Cursor<Vec<u8>>`
   |
note: expected this to be `std::io::Error`
  --> src/lib.rs:28:18
   |
28 |     type Error = io::Error;
   |                  ^^^^^^^^^
   = note: expected struct `std::io::Error`
              found struct `std::io::Cursor<Vec<u8>>`
   = note: required for `StreamReader<Foo, std::io::Cursor<Vec<u8>>>` to implement `for<'a> futures_sink::Sink<&'a [u8]>`
   = note: required for `SinkWriter<StreamReader<Foo, std::io::Cursor<Vec<u8>>>>` to implement `AsyncWrite`

error[E0277]: the trait bound `std::io::Error: From<std::io::Cursor<Vec<u8>>>` is not satisfied
  --> src/lib.rs:11:35
   |
11 |     pub fn into_async_rw(self) -> impl AsyncRead + AsyncWrite {
   |                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `From<std::io::Cursor<Vec<u8>>>` is not implemented for `std::io::Error`, which is required by `SinkWriter<StreamReader<Foo, std::io::Cursor<Vec<u8>>>>: AsyncWrite`
   |
   = help: the following other types implement trait `From<T>`:
             <std::io::Error as From<tokio_stream::Elapsed>>
             <std::io::Error as From<tokio::time::error::Elapsed>>
             <std::io::Error as From<TryReserveError>>
             <std::io::Error as From<NulError>>
             <std::io::Error as From<IntoInnerError<W>>>
             <std::io::Error as From<ErrorKind>>
   = note: required for `std::io::Cursor<Vec<u8>>` to implement `Into<std::io::Error>`
   = note: required for `SinkWriter<StreamReader<Foo, std::io::Cursor<Vec<u8>>>>` to implement `AsyncWrite`