MaterializeInc / materialize

The Cloud Operational Data Store: use SQL to transform, deliver, and act on fast-changing data.
https://materialize.com
Other
5.72k stars 466 forks source link

storage/sources: clarify how Avro schemas are translated to Materialize #28265

Open morsapaes opened 1 month ago

morsapaes commented 1 month ago

Summary

Clarify the way in which Avro schemas are translated to relations in Materialize when creating an Avro-formatted source, through some combination of documentation updates and changes to the translation semantics.

Problem

Adapted from @petrosagg's comment at https://github.com/MaterializeInc/materialize/issues/28265#issuecomment-2230770983:

The current situation is problematic because its description is complicated to explain. What the user wants to understand is how the types of an external system map onto the SQL types that Materialize supports. When this is done right you end up with a mapping that is simple, composable, and that doesn't lose information. A simple mapping would say something like "Avro arrays of X are mapped onto LIST of X". A complex mapping would say something like "Avro arrays of X are mapped onto LIST of X except when X is an integer in which case they are mapped to an ARRAY, except if you have Y feature enabled in which case a poop emoji is produced instead".

Let's review what the current rules for Avro -> Materialize look like today. For its primitive values things are simple. Strings map to TEXT columns, integers map to the corresponding SQL integers etc. For complex values the interesting ones are records and unions.

Unions We categorize unions in two kinds. Unions that are just encoding the fact that a field is nullable, and true union types. An example of the first kind would be the Avro schema ["string", "null"] and an example of the second kind would be the Avro schema ["string", "int"]. The first kind neatly maps to nullable columns in MZ. The latter is converted into multiple columns that are suffixed with an index. So if your schema has a field named foo whose Avro type is ["string", "int"] you'll end up with foo_1 text, foo2 int as the SQL schema. This is only supported when those complicated unions appear as a field of a record. If you had these complicated unions as the element of an Avro array then we bail out.

Records

Records map to a SQL record when they appear somewhere other than the top level of an Avro schema. If they appear at the top level of a schema then they are flattened to their constituent columns. However, if the top level schema is a nullable record (via an Avro union with "null" as described above) then the record isn't flattened. However, if the record is the top level of the key and the key is flattened then all columns are marked as nullable, losing type information. The original inline errors design added yet another clause to this which would say that if you had a top level record and had inline errors enabled then it is not flattened.

These rules are neither simple nor composable and for that reason I don't think they should be the default behavior. In fact they seem bespoke enough that they could be left as something the user writes with SQL.

If was designing the ingestion pipeline from scratch today these are the rules I'd want:

  1. When not using any format then the output schema of a Kafka source is key bytes, value bytes, both nullable.
  2. When using a format (avro, json etc) then the output of the source should be the same as if you did the transformation in SQL using CREATE VIEW formatted_src AS (SELECT decode_avro(key), decode_avro(value) FROM raw_src). This requires that the rules of converting avro don't depend on context.

The non-composability of Avro records is particularly problematic, as today's auto-flattening behavior leads to silent discarding of Kafka messages with null values (https://github.com/MaterializeInc/materialize/issues/10114), which is arguably silent data loss.

Solution

There are several things we can do to improve the situation:

  1. Document the current semantics.

    This is a no downside option. The current subtleties are wholly undocumented. Documenting them would improve the UX of Materialize, and would also likely clarify whether any of the following ideas are worth pursuing—often things that are hard to document ought to be simplified rather than explained in excruciating detail.

    The CREATE SINK documentation for the Avro format is a good model to use. It is a complete and precise description of the translation from Materialize SQL types into an Avro schema. We should seek to write up an equivalently complete and precise description of the translation from an Avro schema into Materialize SQL types.

  2. Change the representation of unions to be composable (#8917): a. By adding support for sum types: https://github.com/MaterializeInc/materialize/issues/6842 b. By doing something one off, like representing unions as a nested record instead of something that can be flattened when contained in a record but not otherwise (details).

  3. Change the representation of records to be composable—i.e., by never flattening at the top level, or allowing flattening at the top level to be toggleable.

    This is somewhat contentious, as the current autoflattening behavior is more ergonomic for simple cases. See the discussion comments below for details.

Priority

As of 17 July 2024, addressing these concerns is not a priority. The current situation works well enough for our users, and none of the fixes proposed here are quick wins, and would require complicated rollouts to account for backwards compatibility concerns.

When we do have bandwidth to pick this up, starting with (1)—i.e., adding documentation for the current situation—is a good way to go.

Original issue description

Depending on the schema provided upstream, Avro-formatted sources may fail to flatten it. Although users are unlikely to hit the (niche) scenarios that result in an unflattened source schema, we don't explicitly put the source into an error state. This makes the semantics of Avro sources unpredictable.

To make the semantics clearer, we should introduce a new option (e.g., FLATTEN) that allows setting the desired flattening behavior. Flattening should be toggled on by default, since this is the expected user experience. There is no evidence of users ever running into failures scenarios, or explicitly asking for an unflattened source.

Related to #28152, and discussed on Slack.

benesch commented 1 month ago

Although users are unlikely to hit the (niche) scenarios that result in an unflattened source schema...

If I'm understanding correctly, I don't think they're all that niche! If your upstream source has a field named error, which seems pretty common, then you won't be able to use VALUE DECODING ERRORS = INLINE and will instead need to use VALUE DECODING ERRORS = INLINE AS my_error.

...we don't explicitly put the source into an error state.

What do you mean? When we fail to flatten, we should reject the CREATE SOURCE command with a relatively nice error!

There is no evidence of users ever running into failures scenarios, or explicitly asking for an unflattened source.

I know several of our customers have fought with the different behavior of INCLUDE KEY and INCLUDE KEY AS ..., because the former flattens and the latter does not, and there's no way to both flatten and rename the key columns. People definitely get tripped up when their key and value contain fields with the same name.

Flattening should be toggled on by default, since this is the expected user experience. There is no evidence of users ever running into failures scenarios, or explicitly asking for an unflattened source.

The current approach to flattening by default leads to silently discarding NULL values with ENVELOPE NONE: https://github.com/MaterializeInc/materialize/issues/10114. This is admittedly an edge case but nonetheless it's a discomforting correctness violation.

IMO, flattening is the expected user experience largely because it is the historical user experience we've provided. If we could turn back the clock, I'd be advocating strongly for not having flattening at all, and just getting users used to typing (value).field—after all, they'll need to type (value).nested.field no matter what to access nested fields.

morsapaes commented 1 month ago

If your upstream source has a field named error, which seems pretty common, then you won't be able to use VALUE DECODING ERRORS = INLINE

It's true that the generic issue of naming collisions isn't solved. I wasn't considering the new inline error decoding option, though, since that specific risk of naming collision has already been mitigated in #28258. Aren't we flattening the schema there even when renaming the error column?

What do you mean? When we fail to flatten, we should reject the CREATE SOURCE command with a relatively nice error!

IIUC from the discussion in last week's sync, we might not reject the CREATE SOURCE command for at least the failure scenario where the schema is valid, but we just can't flatten it (which, in my notes, is scribbled as "if there’s a top-level enum with at least two null values”). I might've misunderstood, so @petrosagg can help clarify things here.

IMO, flattening is the expected user experience largely because it is the historical user experience we've provided.

I... still disagree. Or, I at least disagree until we can provide some level of automatic schema parsing somewhere (even if the current flattening behavior doesn't help with deeply nested schemas). Besides personal experience, two quick examples: a user just recently mentioned that a differentiating feature for them in Rockset was smart schemas, and Snowflake supports INFER_SCHEMA in file ingestion + schema detection in Snowpipe Streaming.

benesch commented 1 month ago

Aren't we flattening the schema there even when renaming the error column?

Yes! I think I misunderstood the hazard you were worried about.

What do you mean? When we fail to flatten, we should reject the CREATE SOURCE command with a relatively nice error!

IIUC from the discussion in last week's sync, we might not reject the CREATE SOURCE command for at least the failure scenario where the schema is valid, but we just can't flatten it (which, in my notes, is scribbled as "if there’s a top-level enum with at least two null values”). I might've misunderstood, so @petrosagg can help clarify things here.

So this may be a glitch I don't have paged in. I can totally believe that we don't handle flattening properly if the Avro schema contains an enum with more than two non-null elements, though. (Although I thought we specifically could handle top-level enums, but not enums within a field...)

IMO, flattening is the expected user experience largely because it is the historical user experience we've provided.

I... still disagree. Or, I at least disagree until we can provide some level of automatic schema parsing somewhere (even if the current flattening behavior doesn't help with deeply nested schemas). Besides personal experience, two quick examples: a user just recently mentioned that a differentiating feature for them in Rockset was smart schemas, and Snowflake supports INFER_SCHEMA in file ingestion + schema detection in Snowpipe Streaming.

But we do provide automatic schema parsing for nested Avro records! The records are strongly typed. Typing (my_col).non_existent_field will throw an error if non_existent_field doesn't exist; typing (my_col).my_string_col + 1 will throw a type error if my_string_col is of type string. Is the problem here more that e.g. SHOW COLUMNS doesn't show you nested types, and just shows record?

morsapaes commented 1 month ago

Is the problem here more that e.g. SHOW COLUMNS doesn't show you nested types, and just shows record?

There really is no user-facing problem with the way things work now. In other words: although I understand the technical blind spots of flattening, not providing the basic parsing we provide now and "just getting users used to typing (value).field" isn't as small a usability regression as it might sound.

I'd be totally cool with your preferred behavior if we handled this basic parsing somewhere else (e.g., the source creation UI), instead of dropping it altogether. Assuming we're no longer considering going back on that decision, the scope of this issue would be specifically the scenarios Petros mentioned might still be failing silently, rather than erroring.

benesch commented 1 month ago

There really is no user-facing problem with the way things work now. In other words: although I understand the technical blind spots of flattening, not providing the basic parsing we provide now and "just getting users used to typing (value).field" isn't as small a usability regression as it might sound.

I agree that typing (value).field instead of field is a UX regression in isolation. But what I'm trying to get at is that there's a UX tension here between what's good for a POC where you're just trying to get a up and running as quickly as possible and what's good for a production deployment with hundreds or thousands of sources. It's not just about the technical blind spots—it's also about the fact that Materialize's default flattening behavior does not work for 100% of Avro schemas. That means that users are in for a surprise to get when they add your 10th or 100th source, and they happen to have key column names that conflict with value column names, or a field named error, or whatever. And now they have some models that have to access columns as key_col_a and other models that have to access columns as (key).col_a, and explaining that why of that difference them is complicated.

In my view, the fact that one of our users has to specifically conditionalize their Terraform codebase so that some sources use include_key_as and some don't is a user facing problem. Maybe not a very high priority problem, but certainly a papercut. Ideally all of those sources could just set include_key = true and Materialize would seamlessly include the key regardless of the column names involved in the key and the value.

Also, our docs don't cover this flattening behavior at the moment. The description of INCLUDE KEY reads "Include a column containing the Kafka message key." which doesn't cover the case of composite keys, and the way that moves around based on whether you specify AS <col> or not. I have a hunch that trying to actually spell out the way these options interact in the docs would make folks a little less enthusiastic about the current behavior. (I've tried in the past and abandoned the attempt...)

the scope of this issue would be specifically the scenarios Petros mentioned might still be failing silently, rather than erroring.

Can we just make it hard fail for now, without a possibility of disabling flattening to make it work? Then we can punt on the broader question of how to conditionally disable flattening to make that niche schema work. When @petrosagg and I discussed this last (https://github.com/MaterializeInc/materialize/issues/10114#issuecomment-1945470135) even the syntax for disabling flattening was terribly unclear (is it a property of the envelope or the format?).

petrosagg commented 1 month ago

If I had to summarize the reason why the current situation is problematic is that its description is complicated to explain. What the user wants to understand is how the types of an external system map onto the SQL types that Materialize supports. When this is done right you end up with a mapping that is simple, composable, and that doesn't lose information. A simple mapping would say something like "Avro arrays of X are mapped onto LIST of X". A complex mapping would say something like "Avro arrays of X are mapped onto LIST of X except when X is an integer in which case they are mapped to an ARRAY, except if you have Y feature enabled in which case a poop emoji is produced instead".

Let's review what the current rules for Avro -> Materialize look like today. For its primitive values things are simple. Strings map to TEXT columns, integers map to the corresponding SQL integers etc. For complex values the interesting ones are records and unions.

Unions We categorize unions in two kinds. Unions that are just encoding the fact that a field is nullable, and true union types. An example of the first kind would be the Avro schema ["string", "null"] and an example of the second kind would be the Avro schema ["string", "int"]. The first kind neatly maps to nullable columns in MZ. The latter is converted into multiple columns that are suffixed with an index. So if your schema has a field named foo whose Avro type is ["string", "int"] you'll end up with foo_1 text, foo2 int as the SQL schema. This is only supported when those complicated unions appear as a field of a record. If you had these complicated unions as the element of an Avro array then we bail out.

These rules are not related to flattening itself but they are in the same "difficult to explain" category and I wish we just did not support complicated unions at all.

Records

Records map to a SQL record when they appear somewhere other than the top level of an Avro schema. If they appear at the top level of a schema then they are flattened to their constituent columns. However, if the top level schema is a nullable record (via an Avro union with "null" as described above) then the record isn't flattened. However, if the record is the top level of the key and the key is flattened then all columns are marked as nullable, losing type information. The original inline errors design added yet another clause to this which would say that if you had a top level record and had inline errors enabled then it is not flattened.

These rules are neither simple nor composable and for that reason I don't think they should be the default behavior. In fact they seem bespoke enough that they could be left as something the user writes with SQL.

If was designing the ingestion pipeline from scratch today these are the rules I'd want:

  1. When not using any format then the output schema of a Kafka source is key bytes, value bytes, both nullable.
  2. When using a format (avro, json etc) then the output of the source should be the same as if you did the transformation in SQL using CREATE VIEW formatted_src AS (SELECT decode_avro(key), decode_avro(value) FROM raw_src). This requires that the rules of converting avro don't depend on context.
benesch commented 1 month ago

Discussed prioritization on Slack here: https://materializeinc.slack.com/archives/C075QPD99PV/p1721248021705779?thread_ts=1721246999.190999&cid=C075QPD99PV

@petrosagg agrees that the only situation which (arguably) leads to silent data loss is #10114. ~I'm going to recast the top level description to be about clarifying the way Avro schemas are translated to Materialize sources.~ Done.

benesch commented 1 month ago

I wish we just did not support complicated unions at all.

Mentioned in the issue description now as 2b, but to flesh it out in more detail: I think there is a straightforward way to support complicated unions in a composable way in Materialize even without support for proper sum types.

Instead of flattening the union fields into the containing record field, which limits the support to record types, we should instead translate unions into nested records. A union like ["foo", "bar"] would get translated into a non-nullable record with fields foo and bar. A union like ["foo", "bar", "null"] would get translated into a nullable record with fields foo and bar. As a special case, a union with a single non-null element like ["foo", "null"] would get translated into a nullable value of type foo.

This should allow for union types in arrays: they get mapped to lists of records in Materialize. It also sidesteps the annoying column naming behavior. If a union appears in a record, the field name for the union is always whatever the field name is in the record, and the record's type uses the union's types as field names, which are guaranteed not to conflict, because the same type cannot appear multiple times in an Avro union.