apache / arrow-rs

Official Rust implementation of Apache Arrow
https://arrow.apache.org/
Apache License 2.0
2.5k stars 745 forks source link

Object Store API improvements #6460

Open fsdvh opened 3 days ago

fsdvh commented 3 days ago

Describe the bug

So recently we started seeing two issues:

Multiple Shutdown

First of all, I want to note that multiple shutdown calls to the same writer are the issue by itself, but I think we can make the situation better with minimum effort.

Here is the code:

             BufWriterState::Write(x) => {
                    let upload = x.take().ok_or_else(|| {
                        std::io::Error::new(
                            ErrorKind::InvalidInput,
                            "Cannot shutdown a writer that has already been shut down",
                        )
                    })?;
                    self.state = BufWriterState::Flush(
                        async move {
                            upload.finish().await?;
                            Ok(())
                        }
                        .boxed(),
                    )
                }

I think we can change it to something more friendly like this:

             BufWriterState::Write(x) => {
                    if let Some(upload) = x.take() {
                        self.state = BufWriterState::Flush(
                            async move { upload.finish().await.map(|_| ()) }.boxed(),
                        )
                    } else {
                        return Poll::Ready(Ok(()));
                    }
                }

This way on a second shutdown call we just immediately return Ok(())

Upload part size issue

Something leftover during the shutdown, complete before the previous upload, in this case, we're getting:

Your proposed upload is smaller than the minimum allowed size

To mitigate this issue we probably should wait for all previous part uploads to complete and then upload the final part which may be smaller than the minimum size of the last one.

Here is the original code I propose to change:

 pub async fn finish(mut self) -> Result<PutResult> {
        if !self.buffer.is_empty() {
            let part = std::mem::take(&mut self.buffer);
            self.put_part(part.into())
        }

        self.wait_for_capacity(0).await?;

        match self.upload.complete().await {
            Err(e) => {
                self.tasks.shutdown().await;
                self.upload.abort().await?;
                Err(e)
            }
            Ok(result) => Ok(result),
        }
    }

by injecting self.wait_for_capacity(0).await?; before actually putting the last chunk we can mitigate this issue.

 pub async fn finish(mut self) -> Result<PutResult> {
        if !self.buffer.is_empty() {
            self.wait_for_capacity(0).await?;  // here

            let part = std::mem::take(&mut self.buffer);
            self.put_part(part.into())
        }

        self.wait_for_capacity(0).await?;

        match self.upload.complete().await {
            Err(e) => {
                self.tasks.shutdown().await;
                self.upload.abort().await?;
                Err(e)
            }
            Ok(result) => Ok(result),
        }
    }

This way we wait for all ongoing uploads before submitting the last part

fsdvh commented 3 days ago

I think it's more an enhancement

tustvold commented 2 days ago

What is the use-case for multiple shutdowns, this feels like a bug in the calling code?

~A similar argument could be made for calling shutdown before the parts have been uploaded, this I think implies the caller is not waiting on the returned futures?~

I'm actually confused by the second one, could you provide more context on how you encountered this error, including which store implementation you are using. Parts are numbered and so it shouldn't matter if they complete out of order.

fsdvh commented 2 days ago

Yes, calling multiple shutdowns it's a bug on the caller side for sure. Changes proposed by me should help mitigate this issue a bit by making shutdown calls relaxed

For the second one, we're using an s3 object store and for some reason, it (s3) doesn't take into account the part number during the multipart upload and return error mentioned above. We managed to solve this issue by calling self.wait_for_capacity(0).await? before calling finish()

tustvold commented 2 days ago

Changes proposed by me should help mitigate this issue a bit by making shutdown calls relaxed

Your proposed change is racy, the first close will wait on the upload to complete, with subsequent calls "completing" instantly.

For the second one, we're using an s3 object store and for some reason, it (s3) doesn't take into account the part number during the multipart upload and return error mentioned above. We managed to solve this issue by calling self.wait_for_capacity(0).await? before calling finish()

Perhaps you could write a simple reproducer for this, I'm not saying S3 doesn't do this but I want to be sure we've correctly identified the issue. Your proposed fix will serialize a round-trip which is unfortunate when many stores are not exactly low-latency.

fsdvh commented 2 days ago

I will try to provide an example of the second issue, but meanwhile, I thought maybe we can change the flush method of BufWriter:

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
        loop {
            return match &mut self.state {
                 BufWriterState::Write(write) => {
                    if let Some(write) = write {
                        write.poll_for_capacity(cx, 0).map_err(|e| e.into())
                    } else {
                        panic!("Already shut down")
                    }
                },
                BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())),
                BufWriterState::Flush(_) => panic!("Already shut down"),
                BufWriterState::Prepare(f) => {
                    self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
                    continue;
                }
            };
        }
    }

By actually waiting for all downloads to complete we can give a user and ability to use flush() + shutdown(), wdyt?

tustvold commented 2 days ago

What would be the benefit of this? It wouldn't be able to guarantee that there isn't any data still in flight as it can't upload the final data until shutdown is called. It is a valid point that we're taking a somewhat dubious interpretation of the AsyncWrite trait, but its a necessary evil. We could probably add further documentation to discourage the use of AsyncWrite

fsdvh commented 2 days ago

yes, but flush will ensure that ongoing writes are flushed which is generally aligned with "flush". And after that, you're okay to finalize your upload in case of shutdown

tustvold commented 2 days ago

will ensure that ongoing writes are flushed which is generally aligned with "flush"

Only those it has actually started writing, I think the proposed behaviour is more confusing. We should just make shutdown do the right thing