apache / arrow-rs

Official Rust implementation of Apache Arrow
https://arrow.apache.org/
Apache License 2.0
2.54k stars 763 forks source link

Wrap all `tokio::io::AsyncWrite` types to a specific type which is implemented `AsyncFileWriter` #6327

Open ethe opened 1 month ago

ethe commented 1 month ago

In current, parquet defines all AsyncWrite implement AsyncFileWriter:

impl<T: AsyncWrite + Unpin + Send> AsyncFileWriter for T {
    ... ...
}

which would in conflict with:

impl<T: AsyncFileWriter + ?Sized + Unpin + Send> AsyncFileWriter for &mut T {
    ... ...
}

this definition is not included right now, however, it should be helpful in more general cases, numerous of similar cases choose to implement this, such as:

Describe the solution you'd like Wrap all tokio::io::AsyncWrite into a proxy struct, then we can define &mut AsyncFileWriter implements AsyncFileWriter:

pub struct TokioFileWriter<F: AsyncWrite> {
    file: F,
}

impl<F: AsyncWrite> From<F> for TokioFileWriter {
    ... ...
}

impl<F: AsyncWrite + Unpin + Send> for TokioFileWriter {
    ... ...
}

Describe alternatives you've considered I don't think there is an alternatives.

Additional context This is a breaking change, but it values to be considered.

Xuanwo commented 1 month ago

Hi, could you share the use case for &mut AsyncFileWriter? Please note that AsyncFileWriter is not intended to be used alone; most users will utilize it through AsyncArrowWriter.

ethe commented 1 month ago

But AsyncArrowWriter is created from AsyncFileWriter, creating an AsyncArrowWriter from &mut AsyncFileWriter is exactly what I want.

struct CustomFile {
    // ...
}

impl AsyncFileWriter for CustomFile {
    fn write(
        &mut self,
        bs: bytes::Bytes,
    ) -> futures::future::BoxFuture<'_, parquet::errors::Result<()>> {
        todo!()
    }

    fn complete(&mut self) -> futures::future::BoxFuture<'_, parquet::errors::Result<()>> {
        todo!()
    }
}

fn test() {
    let mut file = CustomFile {};
    {
        let writer =
            AsyncArrowWriter::try_new(&mut file, SchemaRef::from(Schema::empty()), None).unwrap();
        // write record batch
        writer.flush().await.unwrap();
    }
    // do something with file after writes
}

This works for tokio async file, but for other implementations, it is not.

ethe commented 1 month ago

Also it is helpful to make boxed AsyncFileWriter general, compare with impl AsyncFileWriter for Box<dyn AsyncFileWriter> currently:

impl<T: AsyncFileWriter + ?Sized + Unpin + Send> AsyncFileWriter for Box<T> {
    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<()>> {
        self.as_mut().write(bs)
    }

    fn complete(&mut self) -> BoxFuture<'_, Result<()>> {
        self.as_mut().complete()
    }
}

This also in conflict with impl<T: tokio::io::AsyncWrite + Unpin + Send> AsyncFileWriter for T

Xuanwo commented 1 month ago

// do something with file after writes

From my current understanding, you should not do anything to this file after closing it. Can you provide a detailed list of actions you plan to take?

ethe commented 1 month ago

It is my mistake, actually I just want to flush the AsyncArrowWriter, then seek and read this file or get metadata of file as a native file rather than an AsyncFileWriter, without re-open a new FD (it takes some overhead), I have fixed the example code. Please also consider the boxed AsyncFileWriter case.

Xuanwo commented 1 month ago

Understood. I believe you need AsyncArrowWriter::into_inner.


First of all, allowing users to manipulate the writer after AsyncArrowWriter has written the header and data could be error-prone.

Additionally, it doesn't make sense given our context could involve S3, Azure Blob, or GCS storage services. In these contexts, the file doesn't exist before close has been called, so you can't perform seek/read or other actions.

If you simply want to use this API in the local filesystem context, would you consider using try_clone instead?

ethe commented 1 month ago

Additionally, it doesn't make sense given our context could involve S3, Azure Blob, or GCS storage services.

It does make sense in local filesystems, which allows to be a AsyncFileWriter, don't give up eating for fear of choking.

First of all, allowing users to manipulate the writer after AsyncArrowWriter has written the header and data could be error-prone

However, &mut tokio::io::AsyncWriter is a valid AsyncFileWriter in the current, if you think parquet should not allow &mut AsyncFileWriter be an AsyncFileWriter, then I think &mut tokio::io::AsyncWriter should be fixed. What ever happends, the behavior of tokio and non-tokio should be aligned.

Xuanwo commented 1 month ago

It does make sense in local filesystems, which allows to be a AsyncFileWriter, don't give up eating for fear of choking.

I agree with you that it useful in the local fs context. Therefore, I propose a new API called AsyncArrowWriter::into_inner. I believe this can address your needs in a compatible manner.

ethe commented 1 month ago

Understood. I believe you need AsyncArrowWriter::into_inner.

I think into_inner has different behaviors and takes different overheads, it would move the AsyncFileWriter but &mut AsyncFileWriter is in-placed. But I think it should be nice to have AsyncArrowWriter::into_inner method.