risingwavelabs / risingwave

SQL stream processing, analytics, and management. We decouple storage and compute to offer instant failover, dynamic scaling, speedy bootstrapping, and efficient joins.
https://www.risingwave.com/slack
Apache License 2.0
6.59k stars 538 forks source link

store "pure" SQL in the Source/Table's definition #17472

Open st1page opened 1 week ago

st1page commented 1 week ago

The original idea is proposed by @BugenZhao in https://github.com/risingwavelabs/risingwave/issues/9828 Initially, the intention was to simplify code complexity, but for some new fact such as CDC table's auto schema revolution, this matter has become more urgent.

Background

Currently, we have two completely incompatible ways of determining the schema for tables/sources:

  1. Schema Defined by the CREATE Statement: In this approach, the schema is defined by the user at the time of creating the table or source. Users can specify the columns and their types within the column definitions. In this case, the definition field in the catalog fully contains the SQL information for creating the table or source.

  2. Schema Derived from External Sources: This method was initially designed for external connectors that use Protobuf or Avro for encoding. These encodings typically manage schema information externally. When creating a table or source, a URL or other retrieval method is provided, and RisingWave fetches and parses the schema from the external source.

These two usage patterns are currently in conflict within the implementation. If you expect to use an external schema, specifying columns in the CREATE statement is not allowed, which means it's a pattern of "one or the other," not both.

In the catalog for tables and sources, we have a field called definition that stores the SQL statement used to create the table or source. This field should always remain consistent with the source or table, so it needs to be updated accordingly during ALTER TABLE or ALTER SOURCE operations. Currently, it also serves as the source of truth for some information. For example, when executing CREATE SINK INTO TABLE, the SQL statement must be fully processed again to generate a plan. In ALTER SOURCE REFRESH SCHEMA, the table's format and encoding are obtained. And If the table/source's schema is derived externally, the catalog's definition only stores the original SQL without any sql definition.

Design

In the design, if a table/source should be able to obtain a schema externally and, at the same time, specify some columns in the CREATE statement, and it can support a full set of features such as refresh schema, add/drop column, etc., such a table/source needs to meet these points:

  1. Both external schema and column definitions exist: For certain connectors, it is necessary to check whether they are legal, that is, whether all defined columns can be obtained from the connector.

    • For some external types, different column definition types can be used. (Whether it is necessary to ensure consistency with RisingWave system internal cast behavior in the connector parser's datatype mapping is another classic issue, and there is no answer here.)
  2. Only external schema, no column definition: It should be possible to map from the external schema to the datatype of PG or RW as the SQL definition of the table/source. At this time, using the mapped column definition, plus the external schema, should be reducible to the first case where the external schema and column definitions coexist.

    • It seems we may need to add some compatibility code to fill in the column definitions for those tables/sources that have already been created without column definitions
  3. When performing add/drop column operations: It is only necessary to add or subtract columns to the previous column definitions. Then reduce it to the first case where the external schema and column definitions coexist.

  4. When executing refresh schema registry or other methods: Here, it cannot be simplified to reduce it to the second case of "only external schema, no column definition." This is the current semantics of refresh schema registry, that is, to change the schema of the table/source according to the new schema registry. During the refresh process, I believe that the datatypes of existing columns should not be changed.

  5. another kind of refresh schema registry: However, it should be noted: After the completion of https://github.com/risingwavelabs/risingwave/issues/12982, we may very likely have a new refresh schema registry or refresh external definition. This refresh should not change the schema of the table/source but will only change the currently stored external schema. This is also meaningful because it may affect whether the check can pass when adding/dropping column statements.

The design appears complex only because we've taken the schema registry scenario into consideration. In reality, when we examine specific cases, things are actually quite straightforward.

create table as

The CREATE TABLE AS statement uses the schema of the result from a batch query as its own schema. You can consider it a one-time "external schema." it is mentioned here only because it also requires us to have a way to manually fill in the column definition SQL.

CDC table

In fact, we have already implemented the situation where external schema and column definitions coexist. Therefore, the only thing we need to implement is the second point, which is to be able to automatically generate the corresponding column definitions in the case of CREATE TABLE (*).

table/source with schema registry

First and foremost, it needs to be clarified that this design is not dependent on the issue: https://github.com/risingwavelabs/risingwave/issues/12982. Under this design, there is still a dependency on some information from the externally provided schema registry. Whether this information is persisted in the catalog or fetched from the external source each time does not affect the implementation of this issue. One question is whether the external schema registry information can be fully expressed using SQL column definitions. Considering the various external types(one of or union) and external fields of the same type can be mapped to different types in RisingWave (for example, being transformed into RisingWave's jsonb or struct), the answer should be "no."

The behavior when external schema and column definitions coexist should be like https://github.com/risingwavelabs/risingwave/issues/12199. For the second requirement. We have implement mapping the external datatype to Risingwave datatype. Mapping these further into SQL statement's datatype might not be as difficult.

st1page commented 1 week ago

At present, the only thing we may definitely need to do is the CDC table. The reason this issue is so complex is that I want to ensure whether the model can accommodate the schema registry case.