apache / arrow-rs

Official Rust implementation of Apache Arrow
https://arrow.apache.org/
Apache License 2.0
2.63k stars 803 forks source link

Prevent FlightData overflowing max size limit whenever possible. #6690

Open itsjunetime opened 2 weeks ago

itsjunetime commented 2 weeks ago

Which issue does this PR close?

Closes #3478

What changes are included in this PR?

This reworks the encoding/writing step of flight data messages to ensure it never overflows the given limit whenever possible (specifically, it's impossible when we can't even fit a single row + header within the limit - there are still no mechanisms for splitting a single row of data between multiple messages).

It does this by first constructing a fake IPC header, then getting that header's encoded length, and then subtracting that length from the provided max size. Because the header's size stays the same with the same schema (the only thing that changes is the value within the 'length of data' fields), we don't need to continually recalculate it.

There are more tests I'd like to add before merging this, I was just hoping to get this filed first so that I could get feedback in case any behavior seemed seriously off.

Rationale for these changes

Since we are dynamically checking array data sizes to see if they can fit within the alloted size, this ensures that they will never go over if possible. Of course, as I said before, they will still go over if necessary, but I've rewritten the tests to check this behavior (if the tests sense an overage, but decode it to see that only one row was written, they allow it as there is no other way to get the data across).

Are there any user-facing changes?

Yes, there are API additions. They are documented. As far as I can tell, this shouldn't require a breaking release, but I haven't run anything like cargo-semver-checks on it to actually verify.

itsjunetime commented 2 weeks ago

It looks like CI failed due to some network flakiness - I'm going to close and reopen to try it again

alamb commented 1 week ago

I had a brief skim, couple of comments

Adding an IPC specific API to ArrayData seems a touch unfortunate. It's probably ok, but a little off.

I'm not really sure of the context for this PR, but assuming it is to better avoid the gRPC limits, I worry this may be a fools errand. There is a lot beyond the data buffers going into those payloads (e.g. metadata flatbuffers, framing protobuf, HTTP framing, etc...) and trying to account for all of this is going to be a never ending game of wack a mole. Ultimately the only solution I can see is to set the soft limit in the encoder well below the hard limit enforced by the transport.

The context of this PR is that we have set the soft limit well below the hard limit (2MB vs 4MB) and somehow a customer still managed to hit the hard limit. So @itsjunetime is trying to improve the specificity of the size.

We (at least our team in Influx) don't directly control all layers involved in gRPC communication -- this limit is being enforced by one of the various programs / mixings installed in kubernetes to manage traffic, report on things, etc. While in theory we should be able to figure out which one and increase its limits, that will also likely be a never ending game of whack a mole.

Let me see if I can help to find a way to break this PR up into smaller, more manageable pieces, so that we can get this in / tested in a way that is reasonable to maintain

itsjunetime commented 1 week ago

Let me see if I can help to find a way to break this PR up into smaller, more manageable pieces, so that we can get this in / tested in a way that is reasonable to maintain

One change I could make that may help with breaking this up and also with @tustvold's concern about the IPC-specific interface would be to maybe genericize the get_memory_slice_size_with_alignment over a T: MemoryAccountant (or something like that), e.g.:

enum SizeSource {
  Buffer(BufferType),
  NullBuffer,
  ChildData,
  // does it need to be more granular? idk
}

trait MemoryAccountant {
  fn count_size(&mut self, size: usize, source: SizeSource);
}

impl ArrayData {
  fn get_slice_memory_size_with_accountant<A: MemoryAccountant>(
    &self,
    acc: &mut A
  ) -> Result<(), ArrowError> {
    // ...
  }

  fn get_slice_memory_size(&self) -> Result<usize, ArrowError> {
    struct DefaultAccountant(usize);

    impl MemoryAccountant for DefaultAccountant {
      fn count_size(&mut self, size: usize, _: SizeSource) {
        self.0 += size;
      }
    }

    let mut acc = DefaultAccountant(0);
    self.get_slice_memory_size_with_accountant(&mut acc)?;
    Ok(acc.0)
  }
}

This would allows us to use it nicely with the 'alignment' accounting that we need without being too IPC-specific. It would also allow us to remove the ugly re-accounting for RunEndEncoded buffers that this PR adds in get_encoded_arr_batch_size, which would be nice.

Obviously, I'd be happy to make this change and pull it out to a separate PR (to make this PR easier to review once that separate PR is merged) if we feel like this would be a better move.