apache / arrow-rs

Official Rust implementation of Apache Arrow
https://arrow.apache.org/
Apache License 2.0
2.62k stars 802 forks source link

Parsing a string column containing JSON values into a typed array #6522

Open scovich opened 1 month ago

scovich commented 1 month ago

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

I have a nullable StringArray column that contains JSON object literals.

I need to JSON parse the column into a StructArray of values following some schema, and NULL input values should become NULL output values.

This can almost be implemented using arrow_json::reader::ReaderBuilder::build_decoder and then feeding in the bytes of each string. But the decoder has no concept of record separators in the input stream. Thus, invalid inputs such as blank strings (""), or truncated records ("{\"a\":1"), or multiple objects ("{\"a\": 1} {\"a\": 2}") will confuse the decoding process. If we're lucky, it will produce the wrong number of records, but an adversarial input could easily seem to produce the correct number of records even tho no single input string represented a valid JSON object. Thus, if I want such safety, I'm forced to parse each string as its own RecordBatch (which can then be validated independently), and then concatenate them all. Ugly, error-prone, and inefficient:

```rust pub fn parse_json( json_strings: StringArray, schema: SchemaRef, ) -> Result { // Use batch size of 1 to force one record per string input let mut decoder = ReaderBuilder::new(output_schema.clone()) .with_batch_size(1) .build_decoder()?; // Feed a single string into the decoder and flush it to a record batch let mut parse_one = |json_string: Option<&str>| -> Result { // NOTE: null input becomes empty object (all fields null) let s = json_string.unwrap_or("{}"); let mut reader = BufReader::new(s.as_bytes()); let buf = reader.fill_buf()?; let read = buf.len(); let decoded = decoder.decode(buf)?; assert_eq!(decoded, read); Ok(decoder.flush()?.unwrap()) }; let output: Vec<_> = json_strings .iter() .map(parse_one) .try_collect()?; concat_batches(&schema, output.iter()) } ``` (example code, has panics instead of full error handling)

Describe the solution you'd like

Ideally, the JSON Decoder could define public methods that say how many buffered rows the decoder has, and whether the decoder is currently at a record boundary or not. This is essentially a side effect-free version the same check that Tape::finish already performs when Decoder::flush is called:

```rust impl TapeDecoder { ... /// The number of buffered rows this decoder has, including any in progress if [`has_partial_record()`]. pub fn num_buffered_rows(&self) -> usize { self.cur_row } /// True if the decoder is part way through decoding a record. If so, calling [`finish`] would return an error. pub fn has_partial_row(&self) -> bool { !self.stack.is_empty() } ``` and ```rust impl Decoder { ... /// The number records currently buffered in this decoder. /// A successful call to [`flush`] would produce this many rows. pub fn num_buffered_records(&self) -> usize { self.tape_decoder.num_buffered_rows() } /// True if the decoder is part way through decoding a record. /// Calling [`flush`] on a partial record would return an error. pub fn has_partial_record(&self) -> bool { self.tape_decoder.has_partial_record() } ```

That way, the above implementation becomes a bit simpler and a lot more efficient:

```rust pub fn parse_json( json_strings: StringArray, schema: SchemaRef, ) -> Result { let mut decoder = ReaderBuilder::new(output_schema.clone()) .with_batch_size(json_strings.len()) .build_decoder()?; // Feed a single string into the decoder and verify it parsed as exactly one record let mut parse_one = |json_string| -> Result<(), ArrowError> { let mut reader = BufReader::new(json_string.as_bytes()); let buf = reader.fill_buf()?; let read = buf.len(); let decoded = decoder.decode(buf)?; assert_eq!(decoded, read); assert!(!decoder.has_partial_record()); Ok(()) }; // Make sure each string produces exactly one record. for (i, json_string) in json_strings.into_iter().enumerate() { // NOTE: None becomes a NULL object parse_one(json_string.unwrap_or("null"))?; assert_eq!(decoder.num_buffered_records(), i+1); } decoder.flush().transpose().unwrap() } ```

It would be even nicer if the parse_json method could just become part of either arrow-json or arrow-compute, if parsing strings to JSON is deemed a general operation that deserves its own API call.

Describe alternatives you've considered

Tried shoving each string manually into a Decoder to produce a single RecordBatch, but the above-mentioned safety issues made it very brittle (wrong row counts, incorrect values, etc). Currently using the ugly/slow solution mentioned earlier, that creates and validates one RecordBatch per row, before concatenating them all into a single RecordBatch.

tustvold commented 3 weeks ago

Sorry this one managed to slip through, adding num_buffered_rows and has_partial_record seems perfectly reasonable to me

jatin510 commented 1 week ago

take

jatin510 commented 5 days ago

Hi, @scovich @tustvold I am currently looking into this.

Do we need to duplicate the changes, which are implemented in https://github.com/delta-io/delta-kernel-rs/pull/373/files in the arrow-rs, along with num_buffered_rows has_partial_record functions ?

scovich commented 3 days ago

Do we need to duplicate the changes, which are implemented in https://github.com/delta-io/delta-kernel-rs/pull/373/files in the arrow-rs, along with num_buffered_rows has_partial_record functions ?

Good question. We're happy to tweak the delta-kernel-rs code to match a new arrow-rust API, as long the new API covers the use case. I tried to factor that out in the "details" sections of this issue description.

If you refer to the parse_json_impl method in that PR, it corresponds to my comment in this issue's description:

It would be even nicer if the parse_json method could just become part of either arrow-json or arrow-compute, if parsing strings to JSON is deemed a general operation that deserves its own API call.

Seems like the low-level support can go independently of a decision to expose a new public parse_json method in arrow-compute or arrow-json?