apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.39k stars 3.68k forks source link

Improve MSQ Rollup Experience with Catalog Integration #13816

Open paul-rogers opened 1 year ago

paul-rogers commented 1 year ago

Proposed is a system to allow MSQ INSERT and REPLACE statements against rollup table to be validated against a catalog schema. This is done by making the type of aggregate columns explicit. In addition, for tables not specified in the catalog, a new SQL clause WITH ROLLUP is added to the MSQ statements to make the goal of rollup explicit.

Note: Much discussion has occurred and we're leaning toward a radically different design. See the notes below.

User Experience

This feature affects SQL, the catalog, and the SQL planner. Each has user-visible revisions.

SQL Revision

The INSERT and REPLACE statements take a new ROLLUP clause:

(INSERT | REPLACE) INTO ...
[ PARTITIONED BY <value> ]
[ CLUSTERED BY … ]
[ (WITH | WITHOUT) ROLLUP ]

The semantics are:

Catalog Datasource Metadata Revision

For a detail table, all columns are defined with the existing scalar types. For rollup tables, dimension columns take scalar types. A column is a measure by virtue of a new aggregate type of the form: <AGG>(<ARG_TYPE>). For example:

A table with no measure columns is, by definition, a detail table. A table with at least one measure column is, again by definition, a rollup table. All non-measure columns become dimensions within the rollup table. The result of this pattern is that the user need not keep an "detail/rollup" table type in sync with the columns: the columns determine the table type.

Query Validation

With the above in place, we extend query validation.

First, the catalog mechanism will compute a datasource_type from the column schemas as described above. The type will be either rollup or detail. (Conceptually: the implementation is probably just a Boolean.) A datasource_type value of rollup is equivalent to specifying WITH ROLLUP in the query. The user normally provides either the explicit query option or the table metadata. If both appear, they must agree.

If no table metadata is available, then for backward compatibility, the action depends on the finalizeAggregates query context option. If that context option is set along with either the SQL ROLLUP option and/or the metadata option, then all the set options must agree.

During validation, when a table has column metadata defined, and the column includes a declared type, then the type inferred by the SQL planner must agree with the type specified in column metadata. For measures, this means that the query must include the correct aggregate that matches the declared aggregate type.

Non-measure columns are dimensions by virtue of the fact that they have scalar types. That is, a simple BIGINT can never be a measure: there is no aggregation. Only scalar columns can appear in GROUP BY. Thus, we automatically get validation that the user only uses dimensions in GROUP BY: if the column was actually a measure, the types would not be compatible. SQL ensures that all columns are either measures (because they are defined by aggregates) or dimensions (because they appear in the GROUP BY clause. The combined result is that a rollup ingest query is fully validated.

Implementation Discussion

The above behavior is selected to satisfy a number of requirements:

Plan-time Aggregate Types

Since neither the finalized nor internal intermediate types satisfy the requirements, we invent a new aggregate type for use in the Calcite planner and the catalog. The aggregate type is that described above. The type includes both the name of the aggregate and the argument type(s). The aggregate name is typically the same as the SQL aggregate function name (but not the same as the internal aggregate class name.) The argument types use SQL types (VARCHAR, BIGINT, DOUBLE, etc.)

As noted above, we cannot use the finalized types as they are insufficient to guarantee that the user provides the expected aggregate expression for a measure column. Suppose column m is supposed to be the sum of a BIGINT value. And suppose we used the finalized types. Then, the following would be valid in SQL, but not valid for ingestion:

INSERT INTO dst
SELECT …, x AS m
FROM TABLE(…) (..., x BIGINT)

In the above, the scalar column x is of type BIGINT and is this indistinguishable from a SUM(.) expression.

INSERT INTO dst
SELECT …, SUM(x) + 10 AS m
FROM TABLE(…) (..., x BIGINT)

The above query is fine as a normal (non-ingest) query. However, as an ingest query, we must store the intermediate result for SUM(x) and thus cannot add 10 to the intermediate result. Again, the finalized type is not sufficient to identify that there is an issue.

INSERT into dst
SELECT …, MAX(x) AS m
FROM TABLE(…) (..., x BIGINT)

In this third query, MAX(x) has a return type of BIGINT which is the same as the finalized return type of SUM(BIGINT), so the query would be incorrectly valid.

However, by declaring that the SQL type of SUM(BIGINT) is a type called SUM(BIGINT). Now, it is clear that the type of x in the first query above is BIGINT which is incomparable with SUM(BIGINT) so the query is not valid. In the second query, the addition is with types SUM(BIGINT) + BIGINT which has no implementation, thus the second query is also not valid. For the third query, the types are MAX(BIGINT) and SUM(BIGINT), which are not compatible, so validation fails. The result is that we can successfully catch and reject queries that do not match the type declared in the table metadata.

Now, let us consider a valid query:

INSERT into dst
SELECT …, SUM(x) AS m
FROM TABLE(…) (..., x BIGINT)

The type of the SUM(x) expression is SUM(BIGINT) which is identical to the declared type. Thus, the query passes validation.

Limitations

The proposed solution does have one limitation: because of the way SQL type validation works, Calcite is unable to do the casting necessary to perform automatic type conversions. Suppose we now declare column m to be of type SUM(DOUBLE). The query above would no longer pass validation. Although there is an implicit conversion from BIGINT to DOUBLE, Calcite has insufficient information to do that conversion. In SQL, type propagation flows upward, from SELECT to INSERT. In most engines, the INSERT statement inserts CAST statements to convert from the type provided by SELECT to the type declared in the target table.

Given the current Druid ingest DDL implementation, we’d want the type information to flow down from the target table, through INSERT into the SELECT and finally to the SUM(x) expression. Since SQL doesn’t work that way, we can’t actually make the above happen.

Nor does Druid have a way to cast a SUM(BIGINT) intermediate type to a SUM(DOUBLE) intermediate type at write time: the type of the aggregator just is the type of the output column. That is, Druid’s internal implementation has no concept of a target schema. Instead, the output of a native query is the target schema. Thus, type conversions have to happen at the SQL level. (Actually, since the intermediate type of SUM(BIGINT) is just BIGINT, and the intermediate type of SUM(DOUBLE) is just DOUBLE, Druid might actually do the right thing in the case. A LATEST(BIGINT) expression feeding a LATEST(DOUBLE) column is an example where there is no conversion between the intermediate COMPLEX<Pair<BIGINT, LONG>> and COMPLEX<Pair<DOUBLE, LONG>> types.)

As a result, the best we can do is to require the user to insert a CAST as in the following query:

INSERT into dst
SELECT …, SUM(CAST(x AS DOUBLE)) AS m
FROM TABLE(…) (..., x BIGINT)

At some future time, we could perhaps additional Calcite rewrite rules that can automatically insert the CAST. Or, less likely, we could change Druid’s native query implementation to perform the query-type-to-target-type conversion for intermediate aggregate types.

Impact on MSQ

The proposed types appear only in the planner.

MSQ determines whether to finalize or not by considering the finalizeAggregates context parameter. This parameter is not set automatically from either the ROLLUP SQL option or from catalog metadata.

The reason the above is true, and the reason that MSQ handles rollup without this change, is that MSQ ignores the declared column type for measures. Instead, MSQ computes the native aggregate intermediate types from the aggregator specs in the native query. Prior to this change, the planner used finalized types during planning. Since MSQ ignored these types, MSQ used the proper intermediate types when the query is not finalized.

After this change, MSQ continues to ignore the declared output row type, and continues to re-infer the type from the native query, aggregators and finalize options. Since, for measures, MSQ does not consider the planner’s declared column type, MSQ is unaffected by this proposal which changes that type on the Calcite side.

SQL Aggregators Provide the SQL Aggregate Types

Let us now ask, on the query side, where does the proposed aggregate type enter the SQL planning process? At present, every Druid aggregator has a SQL proxy: SqlAggregator, which provides a SqlAggFunction, which provides a return type inference class which provides the return type for the aggregate function. We provide the intermediate type simply by providing the proper return type inference class.

Dual Operator Tables

Druid queries are normally those that return values to the user, and thus use finalized aggregators. All existing SqlAggFunction declarations infer the finalized return type. We need a way, for MSQ ingest queries, to infer the intermediate type instead. One way to do this is to provide two operator tables: one with the finalized aggregate functions, another with intermediate aggregate functions. A query chooses one of the other depending on whether the target table has rollup or not. In practice, because of the timing of the creation of the table, we provide a per-query table with a flag that can be set to "finalize" or "intermediate" depending on the rollup state discovered while parsing the query.

Fix Aggregator Argument Type Inference

The native conversion code contains a bug for some aggregators: the code infers the input type of an aggregator by looking at the output type. That is, if the type of SUM(x) is LONG, we can infer that x is also LONG. This bug is mostly benign for finalized aggregators. It will fail, however, for AVG(x) if x is LONG. The finalized type has to be DOUBLE but that is not the input type. It is not clear why this bug has not affected users.

While the bug may be benign for (most) finalized aggregators, it is unworkable for intermediate aggregators. The output type of SUM(x), where x is BIGINT is SUM(BIGINT). But, that aggregate type is clearly not acceptable as the input type to the aggregator.

Thus, we must fix the bug to obtain the actual argument type, not just assume the type.

Implementation Plan

The above is to be implemented as part of issue #12546 work. Implementation will occur after PR #13686 completes. That PR will enable scalar type validation; this proposal adds measure type validation. Testing will occur as part of the overall set of catalog-based unit and integration tests.

Future Directions

The above provides a pretty good starter set of rollup validations. Several additional refinements are possible later.

First, nothing in the above proposal says that any given ingest query must provide certain dimensions or measures. Perhaps we are defining a DevOps metrics system, and we must at least have a metric_type, host_name in order to categorize data. Or, perhaps we require that, for an add-tech system, we must have the sum_clicks measure. To solve this, we can add a required or NOT NULL option to column metadata. If a required column does not appear in an ingest query, the query would not pass validation.

The second improvement is to note that the effect of this proposal is that we are ensuring that the user provides the one-and-only aggregation function allowed for a given column. If there is only one right answer, then Druid, not the user, should provide the function. Thus, in our simple example queries above:

INSERT into dst
SELECT …, SUM(x) AS m
GROUP BY a
FROM TABLE(…) (..., a VARCHAR, x BIGINT)

we might want to allow the user to write:

INSERT into dst
SELECT …, x AS m
GROUP BY a
FROM TABLE(…) (..., a VARCHAR, x BIGINT)

And insert the SUM(x) automatically since x is of the correct input type. Implementing this idea is a bit of a project (it would likely need to be done at the time of generating the native query), but is possible.

Finally, if we do the above, then we can also infer dimensions: anything that is not a measure must be a dimension. Given this, we need not have the user tell us what we already know in the GROUP BY clause: we can infer grouping. Thus the query reduces to:

INSERT into dst
SELECT …, x AS m
FROM TABLE(…) (..., a VARCHAR, x BIGINT)

At present, it is a bit of a head-scratcher how an ingest query magically reaches out to compaction to tell it how to do aggregation and grouping. The above makes clear that grouping and aggregation is a property of the datasource, not the query. Doing so makes it clear that compaction, which is a property of the datasource, will use the same metadata to determine how to combine two or more segments.

vogievetsky commented 1 year ago

This looks good to me from a users perspective I like the user facing parts of this proposal.

I just have one note: under Catalog Datasource Metadata Revision I do not like the idea that the type of the table rollup or detail is determined by the presence of a measure column (also in Druid they are called "metric"). I would much prefer to have an explicit flag to set, it could default to detail. Firstly it is technically valid to have a rollup table with no measures (if you simply want to dedupe data) but more importantly I don't want to have to scan all the column to derive the type of the table (like in the console). I also think it would be better if adding a column did not magically change the type of the table ever (reducing the COUNT(*)). Having a rollup marked table with a measure column should yield and error.

Also a typo: add-tech => ad-tech

paul-rogers commented 1 year ago

Thanks, @vogievetsky for the comment. The auto-detection of rollup was in response to someone who didn't like the idea of a flag.

As it turns out, the approach outlined here is not actually achievable. It will work better for the catalog to not be in the "rollup-or-not" "measure-or-dimension" business, but rather just to state storage types. Rollup then becomes a property of the ingestion query, not the datasource. This allows a use case in which early data is detail and later data is rolled up.

Also, it turns out that our aggregations are not quite ready for the level of metadata envisioned here. All we really can know is the storage type. Thus, a simple long or a SUM(long), MIN(long) and MAX(long) are all the same at the physical level, so the catalog actually cannot tell them apart. Again, it is up to each query to choose an aggregate that works for that ingestion.

So, the revised proposal will be that the user specifies the storage type, as a native Druid type. Even there, it turns out that the Calcite planner only knows about finalized types, not intermediate types. There is thought that, eventually, Druid will offer distinct functions for intermediate and final aggregators. That is some time off.

Or, the catalog could list the finalized type and validate the finalized aggregators against that type, even though MSQ will actually use some other type for intermediate aggregates.

So, in the short term, perhaps the catalog will apply only to detail tables, but not rollup because type information in that case is not sufficient to allow any meaningful validation. Once the project leads sort out how MSQ aggregation will work, the catalog can implement whatever choices we make.

gianm commented 1 year ago

So, in the short term, perhaps the catalog will apply only to detail tables, but not rollup because type information in that case is not sufficient to allow any meaningful validation. Once the project leads sort out how MSQ aggregation will work, the catalog can implement whatever choices we make.

I'd like to sort that out here on this issue (or wherever else makes sense), since I had been thinking that specifying rollup info in a catalog was a better solution than putting it in the DML statements, and I had been looking forward to that future. What's the problem with putting rollup info in a catalog?

gianm commented 1 year ago

Some discussion in another PR that illuminates why I am looking forward to handling rollup info a different way: https://github.com/apache/druid/pull/13179#pullrequestreview-1337886521

paul-rogers commented 1 year ago

Discussion has suggested that our goal is to add new agg functions that provide intermediate values, followed by as set of finalize functions that convert intermediate to finalized values. This is an interesting approach. But, it is not currently in the code and so not something that catalog support can build upon today.

Given the future direction, we should wait for that work to be completed. Previously, the notion was to redefine the current SQL agg function return types differently in the rollup-vs-not use cases. For rollup, agg functions would be declared to return intermediate types. For non-rollup, the agg functions would be declared to return the finalized types otherwise. Sinde the Calcite types are just a fantasy, we just adjusted the fantasy to make it easier to validate metric types. That short-term fix, however, is not consistent with the longer-term direction.

The result is that we have to work, short term, with finalized types. We can use finalized types in the catalog as proxies for the rollup types. For example, for LATEST_BY(x, t), use the type of x, not the actual type of COMPLEX<PAIR<s, LONG>> where s is the type of x. This means that we can't actually tell MSQ the type to use, since MSQ must be free to use the intermediate type even if the Calcite layer thinks the query uses the finalized type. This may mean we can't enforce other types since the code may now know which is a metric and which is a dimension.

For this, it means that types for rollup tables in the catalog are mostly for documentation: MSQ won't be able to enforce them because of the ambiguity around aggregate rollup types.

This is probably OK: Druid works fine with whatever column types that MSQ produces. Moving forward, there are some exciting new schemaless features coming online that, by definition, can't be constrained by catalog definitions. After we implement the new set of rollup functions, we can sort out what exactly we want to enforce in MSQ.

For now, let's get the basic metadata functionality in place so that we can have the broader discussion.

paul-rogers commented 1 year ago

Just to put a stake in the ground, here's what I would like as a user. Declare my rollup table (i.e. "cube") in the catalog, including dimensions and measures (what Druid call's metrics). For each measure, specify the rollup function: SUM(LONG), etc.

Then, when I do an ingest, I simply provide the values for each detail row, and let Druid sort out which columns are dimensions and which are metrics. For metrics, Druid works out the agg function to use. That is:

INSERT INTO myCube
SELECT
  TIME_PARSE(t) AS __time,
  server_name AS node, 
  secs * 1000 AS sum_duration_ms, ...
FROM ...

Here , (__time, node) are dimensions, duration_ms is a metric defined as, say, SUM(LONG). Since I already said what I wanted in the catalog definition, I don't have to repeat myself in the ingest. This kind of thing was done in other engines, back when cubes where hot.

This outcome is, of course, a long way off. Still, it can be helpful to imagine where we might eventually get to.

paul-rogers commented 1 year ago

Discussions continue about the user experience that Druid wants to provide. In sharp contrast to the above, another view is that rollup is not a datasource property, but rather an ingestion decision. That is, there is no such thing as a "rollup datasource": there are only datasources, some of which happen to contain aggregated data. (The analogy with Postgres tables is often cited.) At ingest time, a user may choose to write aggregated data into a datasource, or to write pre-aggregated "detail" data.

At compaction time, users may choose to further aggregate data, or to aggregate data that was originally detail. (The common use case that the first month of data is detail, the next month is rolled up to 5 minute grain, the final month is at hour grain.)

Given this, perhaps the best solution is for the catalog to provide no support at all. Rather, ingestion queries and compaction specs provide the support. We can add a WITH ROLLUP keyword to MSQ ingestion to tell the system to write intermediate aggregates to segments. But, the decision about what is dimension and what is a metric is expressed in the SQL for ingestion, which allows decisions to evolve over time.

Since rewriting the same SQL statement for every batch ingestion is tedious, we can learn from streaming, which stores the ingestion spec within a supervisor. For batch, maybe allow the user to store the SQL statement within Druid as a "procedure" (say) which is then invoked as, (say) CALL MY_INGESTION(ARRAY["file1.csv", "file2.csv"]). The rollup "schema" is stored in the SQL statement, not in the table metadata.

In this case, the table schema might not even be needed: the schema is whatever the SQL statement says it is, just as the dimensions and metrics are whatever the SQL statement says they are. We end up with a novel hybrid of DDL, stored procedure and DML all in a single statement: no redundancy, just one compact way to specify table schema, rollup and ingestion in a single unit.

Another way of expressing this idea is that SQL (and the original catalog concept) are based on the idea of declaring what you want, then letting the engine work out how to achieve it. The alternative approach is more procedural: what you want is encoded in the statements needed to produce that result. SQL is declarative, while Java (etc.) are procedural. The approach sketched here would double-down on Druid's historical procedural approach to defining datasources. A procedural approach may be more familiar to the Druid target audience, while the SQL declarative approach may be a bit foreign.

Such an approach is novel: older cube-based engines ask the user to declare the cube schema (dimensions and measures, with aggregate functions or each measure). But, the solution sketched in this comment might be the right solution for Druid given its own history and target users.

paul-rogers commented 1 year ago

Generalizing, the "catalog" becomes Druid's existing metadata (with a new item to store an MSQ ingest query as a "procedure"):

The "catalog" is just a conceptual term, in this case, for the datasource-related APIs we already have. The user is responsible for ensuring consistency across columns as data moves from ingestion through compaction (and in queries). This gives the user the ultimate freedom to evolve columns in any way that the app needs.

This is a task-focused view of metadata: first specify the task, then specify how to work with columns for that task. It contrasts with the original column-focused view: provide information for each column, and let the task use that common information to work out what that task should do.

Both work. Our job is to debate the pros and cons and choose one for implementation.

paul-rogers commented 1 year ago

To complete the catalog, we have to decide on the user experience we want with the catalog, specifically for rollup datasources. It is helpful to start with first principles: identify the overall user experience that Druid seeks to offer. To help us identify that experience, this note offers three broad approaches, each of which seems valid in its own way.

One way to summarize the scenarios is:

This means that, in SQL, one defines a schema and the SQL engine works out the code (execution plan) based on that schema. Whereas, in Druid, the code (ingest spec, compaction spec, MSQ query) spells out the types of each column, so that the schema is a function of that code.

Data Modeling

At one end of the spectrum, we could focus on the business problem that the user wants to solve. For rollup, this is to aggregate data in a (variation of) the classic data cube model. In this view, rows represent business entities and columns represent attributes of those entities (what the original SQL model called domains, and, in implementations are given by data types).

The job of the designer is to determine the entities needed for an application, and to define the attributes. An attribute may be “sourceIP” defined as a string that represents an IPv4 or IPv6 address. Another might be “sentBytes” defined as an integer number of bytes. And so on. The idea is that if an application has a design for a table, then the job of ingestion is to map incoming data into this form, and the job of querying is to produce business insights on top of a known data model. (For example, from the above, given a timestamp, we could produce a chart of bytes in per IP address per minute.)

This model is well-known in the RDBMS community and arises, in part, from the way data has been recorded in business systems from the earliest days of punched cards though early database systems and onto RDBMS systems.

Traditionally, a query engine would apply the relational algebra (typically starting from a relational calculus in SQL) to perform queries over the defined entities (schema). This works because the schema tells the relational engine everything it needs to know to figure out to work with the data.

Since the system understands the meaning of the data (at least at the mechanical level), the system can work out how to perform most operations, which is typically the job of the database planner and optimizer. In systems that allow defining data cubes, the system also works out how to implement rollup. See also IcCube and this UI example.

While this approach is classic, it may not be a good fit for the niche which Druid occupies. The premise is that the user provides the (extended) schema, and the system works out how to perform operations. Druid, however, solves a different problem for which this classic approach may not be the right approach.

Physical Description

Another view of the catalog is that it is simply a physical description of a datasource with any business meaning better left to the user. In this view, the catalog simply records the storage type of each column in those cases where the user wishes to fix the storage type.

In normal practice, Druid chooses a storage type suitable for the actual data. For example, the new schemaless feature will faithfully write into segments the best Druid representation of the data that appears in the input files. MSQ chooses a data type that is the result of the input data type and any operations applied to that type. And so on. The catalog would simply place constraints on this process to, say, always produce a double column rather than sometimes inferring double and sometimes long.

Since Druid is designed to be “Python-like” (any data type is automatically converted to any other data type), one could argue that locking down column storage types is of limited value.

Further, it is not clear if users would be able to make useful decisions about column types without knowing something about the data. That is, it is hard to pick a type for “x”, but a bit easier to pick a type if we know it represents “bytesSent”. As Druid moves to schemaless ingestion, the goal is that the user need not know about individual columns.

The physical representation would be tedious for aggregates: users would have to know the intermediate types for each aggregate. For example, it is not obvious that the intermediate type for LATEST is COMPLEX<PAIR<string, long>>.

Druid is built as a series of procedures for ingest, compaction and querying. Each of those include some notion of column type. Since Druid is not designed to obtain column types from a catalog, there is little value in providing that information: it would be a manual process to ensure that the types specified in each procedure match those in the catalog. (In systems that work out the types automatically, then, of course, there is value in centralizing the schema information.)

And, as noted, there is little value in locking down the types since Druid is designed to work with varying types: Druid faithfully copies the type from input into segments. Thus, it is not clear what the user gains by taking the time to specify a physical schema: they get no value or work savings, it is just another task in addition to the existing tasks, and the user must take care that all the other tasks agree with the specified schema. The physical modeling approach would seem to offer very little value.

Process Modeling

Druid is not a traditional data warehouse. Druid is not typically used for ETL. More often, Druid is a caching layer that loads existing data into a form that accelerates querying. In this light, one could argue that data modeling decisions were already made when producing the source data: they don’t have to be made again in Druid. Since each Druid datasource represents a single input source, Druid serves the user best when it faithfully reproduces the original data in segment form. This is, in fact, the gist of the nested data and schemaless projects: to ensure a faithful reproduction.

If the original input source is messy, or if the user has to enrich that input source, or has to map multiple input sources to a single datasource, then one could argue that many tools exist to assist: Spark, Flink, and more. Thus, the input source presented to Druid represents the final, clean, result of a data pipeline. Again, any data modeling decisions have already been made within that data pipeline.

In this form, the type of data is already determined by the input source: Druid simply has to mirror that type. Since Druid type system is flexible, the user can be assured that even if Druid guesses wrong, the data can still act like the accepted type. (For example, if a value is really a double, but a particular ingest has only integers, there is no harm in storing that particular set of data as long: the data will be converted to double later when needed.)

Of course, Druid provides expressions in all of its ingestion mechanisms: the user can adjust types and names as part of the ingestion procedure. Given that, there is no need to also specify names and types in a catalog.

In this view, there is little value in letting the user pick column types: types are implementation details. Instead, the value is in helping the user perform the operations needed to work with the data: ingestion, compaction, loading data onto the cluster, aging the data, and finally expiring the data.

As it turns out, Druid has many of these operations already, as a collection of REST operations. So, perhaps the catalog is actually a mechanism to unify these processes and fill in any gaps. For example, provide the ability to store an MSQ ingest statement in Druid itself so the user can invoke it simply by providing the input file names. For example:

CALL myIngestion(ARRAY[“file1.csv”, “file2.csv”])

The target schema is directly represented by the MSQ statement itself. That statement defines column names and types, says whether to do rollup, and which aggregation functions to use for each metric. Later, compaction specs either repeat this information, or choose to revise it (to, say, perform further rollup as data ages.) The focus is the procedure: the schema is just the result of the procedure.

The catalog, then, is not about the datasource schema, but rather about tasks performed on the datasource. The schema emerges as the result of these tasks. In this view, the user need not know or care about the schema.

The catalog, then, is a set of tasks that might include:

The catalog, via SQL DDL statements is simply a wrapper around these existing concepts. The one new bit might be the ability to store an MSQ query within Druid, as suggested above.

Schema in this model, is a physical detail which emerges from the procedures the user chooses to run. If the user wants to influence the schema, he/she does so by adjusting a task: adding a CAST to an MSQ ingestion, etc. This is, in fact, how Druid works today, without the catalog, so all we’re doing is recognizing we were already correct and nothing need change regarding schema.

Applying this concept to rollup datasources, a datasource has rollup enabled simply because the ingestion or compaction procedure that created the segments with rollup. The columns that store intermediate values do so simply because the user used aggregate functions in their ingestion procedure. This view recognizes that rollup is an implementation detail and may change over time. It is the process that is key, not the shape of the segments.

Since queries are written based on a knowledge of the input data source, queries reflect the decisions made when creating that input source. In fact, ideally, the input source structure already reflects how that data should be queried: again all Druid has to do is preserve that structure. Druid already tracks dimensions and metrics, so queries already have that information available. There is no need to introduce another representation of a user-defined schema.