delta-io / kafka-delta-ingest

A highly efficient daemon for streaming data from Kafka into Delta Lake
Apache License 2.0
337 stars 71 forks source link

Demonstrate a bug for schema "validation" when writing JSON to a table #131

Open rtyler opened 1 year ago

rtyler commented 1 year ago

I stumbled into this while pilfering code from kafka-delta-ingest for another project and discovered that the code in write_values which does record_batch.schema() != arrow_schema doesn't do what we think it does.

Basically if Decoder "works" the schema it's going to return is just the schema passed into it. It has no bearing on whether the JSON has the same schema. Don't ask me why.

Using the reader's infer_json_schema_* functions can provide a Schema that is useful for comparison:

    let mut value_iter = json_buffer.iter().map(|j| Ok(j.to_owned()));
    let json_schema = infer_json_schema_from_iterator(value_iter.clone()).expect("Failed to infer!");
    let decoder = Decoder::new(Arc::new(json_schema), options);
    if let Some(batch) = decoder.next_batch(&mut value_iter).expect("Failed to create RecordBatch") {
        assert_eq!(batch.schema(), arrow_schema_ref, "Schemas don't match!");
    }

What's even more interesting, is that after a certain number of fields are removed, the Decoder no longer pretends it can Decode the JSON. I am baffled as to why.

The current failure from this test is:

---- writer::tests::test_schema_matching stdout ----
thread 'writer::tests::test_schema_matching' panicked at 'Expected the write of our invalid schema rows to fail!
Ok(())', src/writer.rs:1089:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
rtyler commented 1 year ago

I discussed this at length today with @xianwill privately. I will attempt to summarize our verbose conversation :smile:

Basically this does look like a bug and a half, and maybe some unclear behavior. The schemas themselves being used have nullable fields for everything. What is interesting is that a nullable "primitive type" can be omitted and the Decoder will try to fill out the batch. Whereas a missing struct/map/list causes the Decoder to ignore the record entirely, and can lead to an EmptyBatchError. :bug:

To that end, there is in fact no real check occurring that the schema of the JSON lines up with the schema of the Delta table at the moment :bug: Because of nullable fields, the primitive batch.schema() != arrow_schema_ref which is present in code might not be helpful even if it did work correctly. (Meaning it compared the inferred schema of the JSON to the actualized schema of the Delta table)

From @xianwill :

we should detect a SchemaMismatch in case of missing required fields or incompatible data types. IMO == is a bit too strong. For example, it should still be okay for nullable fields to not be present in the source schema. Not sure if inferring schema from the first json in the batch solves it quite right either. What if the second record has a different structure? Also, we need to be able to dead letter individual messages in case of SchemaMismatch and continue processing the stream.

The disconnect between schema in the parquet files and the schema of the Delta table doesn't make deciding what the appropriate behavior for kafka-delta-ingest should be here. :scream_cat: