MaterializeInc / materialize

The Cloud Operational Data Store: use SQL to transform, deliver, and act on fast-changing data.
https://materialize.com
Other
5.72k stars 466 forks source link

storage/sources/kafka: messages with no record payload silently discarded #10114

Open philip-stoev opened 2 years ago

philip-stoev commented 2 years ago

What version of Materialize are you using?

v0.16.1-dev (e0aa70a7c)

How did you install Materialize?

Built from source

What was the issue?

Kafka messages that have no payload are silently discarded -- they show up in the mz_source_info table, but produce no warnings in the log and no error in the source.

To reproduce:

$ set schema={
        "type" : "record",
        "name" : "test",
        "fields" : [
            {"name":"f1", "type":"long"}
        ]
    }

$ kafka-create-topic topic=envelope-none

$ kafka-ingest format=avro topic=envelope-none key-format=avro key-schema=${schema} schema=${schema} publish=true repeat=50
{"f1": 1}

> CREATE MATERIALIZED SOURCE envelope_none
  FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-envelope-none-${testdrive.seed}'
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY '${testdrive.schema-registry-url}'
  ENVELOPE NONE

> SELECT COUNT(*) FROM envelope_none
50

Relevant log output

No response

benesch commented 2 years ago

Huh, I fixed this exact bug with Protobuf sources. I wonder what the Avro decoder is doing differently.

petrosagg commented 7 months ago

ENVELOPE NONE explicitly discards null values (tombstones). Is this undesirable?

benesch commented 7 months ago

@petrosagg you DM'd me a few minutes after you posted the above comment with a link to my comment here:

Porting over some thoughts from Slack about how to make FORMAT AVRO produce NULL in the face of NULL Kafka message payloads.

The design I had in mind was ENVELOPE NOTFLAT (better name pending) where the source had exactly one nullable column named data (maybe a better name here too?). And the type of that column would be exactly the type of the top-level Avro type. So to express ENVELOPE UPSERT in SQL, you’d use ENVELOPE NOTFLAT to get the right semantics for top-level NULLs, do your upsert logic using TRANSFORM USING, and then flatten out your record if desired by writing SELECT data.f1, data.f2, data.f3 etc.

In other words: I think the implicit default of ENVELOPE FLATTEN was a mistake, and that probably should have been behavior that you have to explicitly opt in to.

So does that answer your question above? I'm not entirely sure how we get from here to there, but I'm interested in trying to find a path!

petrosagg commented 7 months ago

Hm, depends on what this issue is about.

This issue (as I understand it) is that ENVELOPE NONE drops null values on the floor. This is part of its specification and I don't think we can change that in backward compatible way. So for this interpretation I would close the issue as won't fix.

An alternative interpretation is that this issue is requesting a new feature which would be either a new envelope name or an option on the existing NONE envelope (e.g ENVELOPE NONE INCLUDE NULLS AND DONT FLATTEN PLEASE (syntax tbd)) which does not flatten the values and presents upstream tombstones as nulls. In this case this will be very easy to do in a week or so when my series of PRs get merged.

What is your interpretation?

benesch commented 7 months ago

What is your interpretation?

I like interpretation two. Here's a proposal:

CREATE SOURCE ... ENVELOPE NONE (FLATTEN = {TRUE | FALSE})

If we make FLATTEN = TRUE the default, then this will be perfectly backwards compatible.

The downside is this mixes formats and envelopes a bit—like, flattening doesn't mean anything for FORMAT JSON or FORMAT TEXT or FORMAT BYTES. But, also, that mixing of formats and envelopes is already happening, so adding a FLATTEN option on the envelope is largely just making the conflation that we already do explicit.

petrosagg commented 7 months ago

Flatness is only one dimension, the other one is null behavior. Here is the full matrix on an imaginary input of a nullable column of type record(f1: text, f2: int)

Flatten Discard NULLs Input Output
Y Y NULL nothing
Y Y ROW('foo', 2) 'foo', 2
Y N NULL NULL, NULL
Y N ROW('bar', 1) 'bar', 1
N Y NULL nothing
N Y ROW('foo', 2) ROW('foo', 2)
N N NULL NULL
N N ROW('bar', 1) ROW('bar', 1)

Ii today's MZ when a user specifies ENVELOPE NONE we automatically pick FLATTEN=Y if the FORMAT decodes into a record (e.g Avro) and we automatically pick FLATTEN=N if the format decodes into any other type (e.g format BYTES).

The downside is this mixes formats and envelopes a bit—like, flattening doesn't mean anything for FORMAT JSON or FORMAT TEXT or FORMAT BYTES. But, also, that mixing of formats and envelopes is already happening, so adding a FLATTEN option on the envelope is largely just making the conflation that we already do explicit.

There is salvation! ENVELOPE NONE should only be about whether nulls are suppressed, which by default it does. Then, after the envelope there is an imaginary additional post-processing stage (ahemᴛʀᴀɴsғᴏʀᴍ ᴜsɪɴɢahem) that can take the output of envelope processing and for example flatten it.

So I think today's pipeline in explicit syntax is: ENVELOPE NONE (exclude_nulls) FLATTEN RECORD

benesch commented 7 months ago

The place I get tripped up is that I don't think Flatten: Y, Discard NULLs: N should be a valid option. At least, if the type is record(f1: text NOT NULL, f2: int NOT NULL), in order for NULL, NULL to be a valid output you need to weaken the type to text, int instead of text NOT NULL, int NOT NULL. Fuzzing up the difference between "value was NULL" and "value was present but all fields were NULL" seems pretty sketchy to me in general, and I'm not sure we want to give users that option at all.

If we didn't have this weird flatten behavior, we'd never offer ENVELOPE NONE (EXCLUDE NULLS) as an option, right? It's a lot of syntax and conceptual weight to carry around for something that's trivial to write in a downstream materialized view, and something that will eventually be trivial to express with TRANSFORM USING. So I'm a little hesitant to introduce it.

The way I think about it, "flatten" is just an operation that necessarily discards NULLs, in order to preserve the types of the fields of the underlying record.

petrosagg commented 7 months ago

The place I get tripped up is that I don't think Flatten: Y, Discard NULLs: N should be a valid option.

Excellent, I agree! There are two wrinkles however.

First, we already offer it for keys in the following scenario:

  1. You used INCLUDE KEY (but not INCLUDE KEY AS 'foo which does not flatten)
  2. The schema of the key is a record
  3. The key payload is null
  4. The value payload is some normal value

Then the record will be produced and the key will have NULL in its columns and yes the schema declared by the planner is weakened in this case to make everything nullable. This behavior is controlled by something called KeyEnvelope in the code which can be either Flattened or Named. That is not a big deal, only a potential confusion among users that might wonder why this type of processing is offered for keys but not values.

Second, when users create a source that specifies a FORMAT but omits specifying an envelope, then we plan the query as if the user typed ENVELOPE NONE anyway.

This means that we can't use the absence of an envelope declaration as the encoding of the Flatten: N, Discard NULLs: N behavior (i.e the behavior that gives you raw access to the decoded data). So some explicit syntax would have to be defined somewhere to say NO ENEVELOPE.

Going back to your proposal of ENVELOPE NONE (FLATTEN = {TRUE | FALSE}) that sounds fine to me since it makes explicit the flatten behavior. In my decoding refactor the option to flatten is indeed appearing as a configuration of the envelope https://github.com/MaterializeInc/materialize/blob/1514def7ba370482f07a03db2e4149b2029c9aee/src/storage-types/src/sources/envelope.rs#L77-L78 since we don't yet have any notion of "post processing stage".

For controlling the Discard NULLs behavior I see two paths:

  1. We check all queries in production and we find that no-one relies on that, so we make a backwards incompatible change such that not adding an envelope exposes you the raw decoded data. This is ideal
  2. If users do depend on that in prod then we either need to work with them and still do the backwards incompatible transition or if it's widespread we will need to come up with additional syntax that disables the envelope stage.
benesch commented 7 months ago

First, we already offer it for keys in the following scenario:

Argh! 😡 You're so right.

Second, when users create a source that specifies a FORMAT but omits specifying an envelope, then we plan the query as if the user typed ENVELOPE NONE anyway.

Yeah, the reason it works like that is because ENVELOPE NONE was meant to be equivalent to having no envelope at all. Oops.

benesch commented 7 months ago

Thinking about this more, the fact that the INCLUDE KEY object is subject to the same flattening behavior kinda implies that FLATTEN = {TRUE | FALSE} is actually a property of the format, not the envelope!

Imagine:

KEY FORMAT {AVRO | PROTOBUF} (FLATTEN = {TRUE | FALSE}, TOP LEVEL NULL = {FLATTEN | DISCARD})
VALUE FORMAT {AVRO | PROTOBUF} (FLATTEN = {TRUE | FALSE}, TOP LEVEL NULL = {FLATTEN | DISCARD})

Haven't thought through how this interacts with ENVELOPE UPSERT. But I think there's something interesting to explore here.

petrosagg commented 7 months ago

For me it's easier to think about in Rust types which I then translate into SQL types. Here is my mental model of the ingestion pipeline as it currently stands (error handling and some other details omitted for clarity):

Sources

Every source is produces a struct in the following form:

struct Record<K, V> {
    key: K,
    value: V
}

For Kafka both K and V are Option<Bytes> to describe the payloads of each message. This translates into a RelationDesc with two nullable byte columns.

For PG K is () and V is the actual record of the table. This translates into a RelationDesc with a single non nullable column whose type is ScalarType::Record and the schema of the record is the schema of the upstream table.

Decoding

For decoding we first have the various format decoders that are functions that go from Bytes to some type T where T depends on the specific decoder. Here are some examples:

fn bytes_decoder(input: &Bytes) -> Bytes { input.clone() }
fn json_decoder(input: &Bytes) -> serde_json::Value { serde_json::from_slice(input) }

Then the decoding stage as a whole is defined like so:

fn decode<I, K, V, KD, VD>(input: I, key_decoder: KD, value_decoder: VD) -> impl Iterator<Item=Record<Option<K>, Option<V>>>
where I: Iterator<Item=Record<Option<Bytes>, Option<Bytes>>>,
    KD: Fn(&Bytes) -> K,
    VD: Fn(&Bytes) -> V,
{
    input.map(|record| {
        Record {
            key: record.key.map(key_decoder),
            value: record.value.map(value_decoder),
        }
    })

Note: the decoding stage also works with Record<Bytes, Bytes> and a few more combinations in the obvious way.

Envelope processing

All currently defined envelopes take as input Record<Option<K>, Option<V>>. Then we have the following variations

fn envelope_none(input: impl Iterator<Item=Record<Option<K>, Option<V>>>) -> impl Iterator<Item=Record<Option<K>, V>> {
    input.flat_map(|record| {
        let value = record.value?;
        Some(Record {
            key: record.key,
            value,
        })
    })
}

I won't write the upsert envelope in pseudocode since it needs to deal with diffs and is beside the point of this comment but it too takes as input Record<Option<K>, Option<V>> records and produces Record<Option<K>, V>> records.

The main takeaway from this section is that the input to the upsert stage must have a way to understand tombstones which is currently served by having a single nullable column as its input where NULL is a tombstone.

Post processing

The final stage of the pipeline defines how to collapse a Record<K, V> into a single row schema. This includes things like flattening the key and/or value and some other things that were omitted for clarity (e.g metadata columns). This post processing stage doesn't actually exist as a standalone operator in the code but it certainly exists in my mind. I would like to also make the code have this explicit post-process stage as the flattening behavior is duplicated in a few places at the moment (without changing anything in the SQL syntax).

Ok, so under this framework let's analyse your proposal:

Thinking about this more, the fact that the INCLUDE KEY object is subject to the same flattening behavior kinda implies that FLATTEN = {TRUE | FALSE} is actually a property of the format, not the envelope!

Haven't thought through how this interacts with ENVELOPE UPSERT. But I think there's something interesting to explore here.

The upsert stage needs some way of understanding whether the incoming row is a tombstone or not. So the central question to whether flattening can happen before envelope processing is whether you can flatten the output of the decoding stage without losing information.

There is at least one case where this isn't possible which is when you have an Avro schema whose top level schema node is a union of [null, some_type] which means the avro_decoder will have a signature of Bytes -> Option<T>. Currently this is fine because the input to upsert is Option<Option<T>> and the output is Option<T> which then gets flattened. But were you to do it the other way around you would lose information because if I show you just an Option<T> there is no way for you to tell apart the tombstone case from the actual null value case. For this reason I don't think flattening conceptually belongs to the decoding stage.

All these constraints are mostly a side effect of the SQL type system not supporting arbitrary sum types, because if we did support it then we could indeed flatten at the decoding stage and declare the output type to be UNION { NULL, <the flattened schema> }.

benesch commented 7 months ago

For posterity: Petros and I chatted about this live in our 1:1 this week. We both agree this needs more thought but no immediate plans to pick this up.