apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.5k stars 3.7k forks source link

Druid Catalog Proposal #12546

Open paul-rogers opened 2 years ago

paul-rogers commented 2 years ago

Druid is a powerful database optimized for time series at extreme scale. Druid provides features beyond those of a typical RDBMS: a flexible schema, ability to ingest from external data sources, support for long-running ingest jobs, and more. Users coming from a traditional database can be overwhelmed by the many choices. Any given application or datasource uses a subset of those features; it would be convenient for Druid, rather than the user, to remember the feature choices which the user made.

For example, Druid provides many different segment granularities, yet any given datasource tends to prefer one of them on ingest. Druid allows each segment to have a distinct schema, but many datasources want to ensure that at least some minimal set of “key” columns exist. Most datasources use the same metric definitions on each ingest. And so on.

Traditional RDBMS systems use a catalog to record the schema of tables, the structure of indexes, entity-relationships between tables and so on. In such systems, the catalog is an essential part of the system: it is the only way to interpret the layout of the binary table data, say, or to know which indexes relate to which tables. Druid is much different: each segment is self-contained: it has its own “mini-catalog.”

Still, as Druid adds more SQL functionality, we believe it will be convenient for users to have an optional catalog of table (datasource) definitions to avoid the need to repeat common table properties. This is especially useful for the proposed multi-stage ingest project.

Proposal Summary

Proposed is an add-on metadata catalog that allows the user to record data shape decisions in Druid and reuse them. The catalog contains:

Technically, the proposal envisions the following:

Motivation

With the catalog, a user can define an ingestion input source separate from a SQL INSERT statement. This is handy as the current EXTERN syntax requires that the user write out the input definition in JSON within a SQL statement.

The user first defines the input table, using the REST API or (eventually) the SQL DDL statements. Then, the user references the input table as if it were a SQL table. An example of one of the CalciteInsertDmlTest cases using an input table definition:

INSERT INTO dst SELECT *
FROM "input"."inline"
PARTITIONED BY ALL TIME

Here input is a schema that contains input table definition, while inline is a user-defined table that is an in-line CSV input source.

Similarly, when using SQL to ingest into a datasource, the user can define things like segment granularity in the catalog rather than manually including it in each SQL statement.

We expect to support additional use cases over time: the above should provide a sense of how the catalog can be used.

Catalog as "Hints"

Druid has gotten by this long without a catalog, so the use of the catalog is entirely optional: use it if it is convenient, specify things explicitly if that is more convenient. For this reason, the catalog can be seen as a set of hints. The "hint" idea contrasts with the traditional RDBMS (or the Hive) model in which the catalog is required.

External Tables

Unlike query tools such as Druid, Impala or Presto, Druid never reads the same input twice: each read ingests a distinct set of input files. The external table definition provides a way to parameterize the actual set of files: perhaps the S3 bucket, or HDFS location is the same, the file layout is the same, but the specific files differ on each run.

Resolve the Chicken-and-Egg Dilemma

We noted above that segments are their own "mini-catalogs" and provide the information needed for compaction and native queries to do their job. The problem is, however, creating segments, especially the first ones: there is no "mini-catalog" to consult: the user has to spell out the details. The catalog resolves this dilemma by allowing the metadata to exist before the first segment. As a bonus, once a table (datasource) is defined in the catalog, it can be queried, though the query will obviously return no rows. A SELECT * will return, the defined schema. Similarly, if a user adds a column to the table, that column is immediately available for querying, even it returns all NULL values. This makes the Druid experience just a bit simpler as the user need not remember when a datasource (or column) will appear (after the first ingestion of non-null data.)

Query Column Governance

Druid allows columns to contain can kind of data: you might start with a long (BIGINT) column, later ingest double (DOUBLE) values, and even later decide to make the column a string (VARCHAR). The SQL layer uses the latest segment type to define the one type which SQL uses. The catalog lets the user specify this type: if the catalog defines a type for a column, then all values are cast to that type. This means that, even if a column is all-null (or never ingested), SQL still knows the type.

Cold Storage

Druid caches segments locally in Historical nodes. Historicals report the schema of each segment to the Broker, which uses them, as described above, to work out the "SQL schema" for a datasource. But, what if Druid were to provide a "cold tier" mode in which seldom-used data resides only in cold storage? No Historical would load the segment, so the Broker would be unaware of the schema. The catalog resolves this issue by letting the user define the schema separately from the segments that make up the datasource.

Components

The major components of the metadata system follow along the lines of similar mechanisms within Druid: basic authentication, segment publish state, etc. There appears to be no single Druid sync framework to keep nodes synchronized with the Coordinator, so we adopt bits and pieces from each.

Metadata DB Extension

Defines a new table, perversely named "tables", that holds the metadata for a "table." A datasource is a table, but so is a view or an input source. The metadata DB extension is modeled after many others: it provides the basic CRUD semantics. It also maintains a simple version (timestamp) to catch concurrent updates.

REST Endpoint

Provides the usual CRUD operations via REST calls as operations on the Coordinator, proxied through the Router. Security in these endpoints is simple: it is based on security of the underlying object: view, datasource, etc.

DB Synchronization

Keeps Broker nodes updated to the latest state of the catalog DB. Patterned after the mechanism in the basic auth extension, but with a delta update feature borrowed from an extension that has that feature.

Planner Integration

Primary focus on this project is using catalog metadata for SQL statements, and, in particular, INSERT and REPLACE statements. Input tables replace the need for the EXERN macro; datasource metadata replaces the need to spell out partitioning and clustering.

SQL DDL Statements

As Druid extends its SQL support, an obvious part of this catalog proposal would be DDL statements such as CREATE/ALTER/DROP TABLE, etc. This support is considered a lower priority because:

Rollup Datasources

NOTE: This section is now out of scope and is no longer planned. Leaving this here to spur future discussion.

The main challenge is around rollup datasources. In rollup, the datasource performs aggregation. It is easy to think that ingestion does the aggregation, but consider this example: ingest a set of files, each with one row. You'll get a set of, day, dozens of single-row segments, each with the "aggregation" of a single row. The compaction mechanism then combines these segments to produce one with overall totals. This process continues if we add more segments in the same time interval and compact again.

This little example points out that compaction knows how to further aggregate segments: even those with a single row. Of course, ingestion can do the same trick, if there happen to be rows with the same dimensions. But, since compaction can also do it, we know that there is sufficient state in the one-row "seed" aggregate for further compaction to occur. We want to leverage this insight.

The idea is, in SQL INSERT-style ingestion, the work happens in three parts:

This means that we can convert the following example:

INSERT INTO "kttm_rollup"

WITH kttm_data AS ...

SELECT
  FLOOR(TIME_PARSE("timestamp") TO MINUTE) AS __time,
  session,
  agent_category,
  agent_type,
  browser,
  browser_version,
  MV_TO_ARRAY("language") AS "language", -- Multi-value string dimension
  os,
  city,
  country,
  forwarded_for AS ip_address,

  COUNT(*) AS "cnt",
  SUM(session_length) AS session_length,
  APPROX_COUNT_DISTINCT_DS_HLL(event_type) AS unique_event_types
FROM kttm_data
WHERE os = 'iOS'
GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11
PARTITIONED BY HOUR
CLUSTERED BY browser, session

To this form:

INSERT INTO "kttm_rollup"
SELECT
  TIME_PARSE("timestamp") AS __time,
  session,
  agent_category,
  agent_type,
  browser,
  browser_version,
  language, -- Multi-value string dimension
  os,
  city,
  country,
  forwarded_for AS ip_address,

  session_length,
  event_type AS unique_event_types
FROM "input".kttm_data
WHERE os = 'iOS'

Here:

The column-level rules operate much line the built-in type coercion rules which SQL provides. Instead of simply converting a INT to BIGINT, Druid add rules to implicitly convert a scalar BIGINT to a SUM(BIGINT) column.

Extensions

A possible feature is to allow an external service to provide the catalog via an extension. An example of this is to use the Confluent schema registry, the Hive Metastore, etc. We'll flesh out this option a bit more as we get further along.

Alternatives

The three main alternatives are:

Since the catalog will become core to Druid, we tend to favor creating one focused on the rather unique needs which Druid has.

Development Phases

This project has multiple parts. A basic plan is:

Backward Compatibility

Existing Druid installations will create the new catalog table upon upgrade. It will start empty. If a datasource has no metadata then Druid will behave exactly as it did before the upgrade.

If a version with the catalog is downgraded, the old Druid version will simply ignore the catalog and the user must explicitly provide the properties formerly provided by the catalog.

paul-rogers commented 2 years ago

Let's add additional design details as notes to keep the description light.

Security

The catalog is implemented as another table in Druid's metastore DB. Operations are via the REST API. When we support SQL DDL statements, the implementation of those statements will use the same REST API (with some for of user impersonation.)

In other systems, there are separate permissions for table metadata and table data. Druid only has read and write access to each datasource, so we work within those limitations.

The basic security rules for the catalog are:

Druid allows granting permissions on via a regular expression. So, the admin could cobble together a form of private/temporary tables by allowing, say, write access to all tables of the form "-.*" such as "paul-test", "paul-temp-metrics" and "paul-temp3".

A future enhancement would be to invent a more advanced security model, but that is seen as a task separate from the catalog itself.

FrankChen021 commented 2 years ago

First, I want to say, this is a great proposal that lays some foundation work to introduce DDL to Druid.

My question is what's the relationship between current INFORMATION_SCHEMA in Druid and the proposed Catalog here?

paul-rogers commented 2 years ago

@FrankChen021, thanks for your encouragement! Yes, I hope that borrowing a bit of RDBMS functionality we can make Druid a bit easier to use by letting Druid keep track of its own details.

You asked about INFORMATION_SCHEMA, so let's address that.

Revised on June 1.

INFORMATION SCHEMA Revisions

Druid's INFORMATION_SCHEMA implements a feature introduced in SQL-92: it is a set of views which presents the same underlying metadata which Druid's SQL engine uses to plan queries. For example, segments for a datasource might have multiple types: long in one segment, string in a newer segment. Druid picks one for use in SQL. In this case, it will pick string (presented to SQL as VARCHAR) since that is what appears in the newest segments.

Since INFORMATION_SCHEMA is a "view", it is immutable: users cannot modify the INFORMATION_SCHEMA tables directly. Instead, modifications are done via Druid's APIs and, with this proposal, via the catalog.

In this proposal, we modify the INFORMATION_SCHEMA results to show the effect of applying catalog information on top of the information gleaned from segments. For example, if the user decided that the above field really should be a long, they'd specify it as the SQL BIGINT type, which would override Druid's type inference rules. The type for the column would then appear as BIGINT in the INFORMATION_SCHEMA.COLUMNS record for that column.

We may add a "hide" option to columns to mark a column that exists in segments, but, for whatever reason, is unwanted. A hidden column would not appear in INFORMATION_SCHEMA.COLUMNS since it is not available to SQL.

INFORMATION_SCHEMA as a Semi-Standard

INFORMATION_SCHEMA originally appeared in SQL-92, based on "repositories" common in the late 80's. Hence, it has a rather dated set of concepts and data types. It appears that most vendors (such as MySQL, Postgres, Apache Drill, etc.) keep the parts they can, modify bits as needed, and add extensions keeping with the 80's vibe of the original design. Postgres goes so far as to define its own logical types that mimic the types used in the SQL-92 spec (such as yes_or_no).

To maintain compatibility, we retain the standard aspects of INFORMATION_SCHEMA while removing the bits not relevant to Druid.

Druid-Specific Security Restrictions

According to the SQL-92 spec referenced earlier, all users should be able to see INFORMATION_SCHEMA entries for all tables, columns and other resources in the database. Druid's security model is more strict: users can only see entries for those tables (or datasources) for which the user has read access. We retain this description, and enforce the same restriction in the catalog APIs.

Clean-up

Since Druid inherited INFORMATION_SCHEMA from Calcite, it picked up some columns that actually have no meaning in Druid. New users have to learn that, though the columns exist, they don't do anything, which is annoying. So, we propose to do a bit of house-cleaning.

Renove Unused `SCHEMATA Columns

Drop the following because Druid always works in UTF-8.

The following two columns may be useful as we work out catalog details:

Remove Unused COLUMNS Columns

In COLUMNS, we propose to remove:

Add Druid-Specific Table Metadata

For Tables, we may want to add additional useful Druid information:

The above are needed so the SQL planner knows how to plan INSERT statements against rollup tables. Since the SQL planner will use this information, INFORMATION_SCHEMA should present it so the user sees what the planner uses.

To handle aging-based changes in datasources, we could introduce another table that provides these rules, but that is out of cope for this project.

Add Druid-Specific Column Metadata

For COLUMNS we may add additional columns to express Druid attributes. This is very preliminary:

The following column changes meaning:

clintropolis commented 2 years ago

drive by comment re INFORMATION_SCHEMA, i think it is a bit of a standard, https://en.wikipedia.org/wiki/Information_schema, so we need to be considerate about how we modify it I think. (I'll try to read and digest the rest of this proposal in the near future)

paul-rogers commented 2 years ago

@clintropolis, thanks for the note on INFORMATION_SCHEMA It appears that it was introduced in SQL 92, and appears to have a very 1980's feel to it in naming and types. Various DBs support varying aspects of the schema. A quick scan of the table-of-contents of the SQL 2016 spec suggests that the information schema still exists, though I'd have to buy the spec to find out how much it has evolved since SQL-92. MySQL for example adds many, many tables on top of the standard SQL-92 set.

The key fact is that INFORMATION_SCHEMA is intended to be a set of views on top of the underlying "definition tables." There are no definition tables in Druid: information is stored in non-table form in multiple places. The proposed catalog adds to those sources of information. Druid's INFORMATION_SCHEMAsimulates the views by creating table contents on the fly. This aspect will remain unchanged in this proposal.

Updated the INFORMATION_SCHEMA section to reflect this information.

paul-rogers commented 2 years ago

Work is far enough along to propose the REST API. This material (with any needed updates) will go into the API docs as part of the PR.

REST API

The catalog adds two sets of APIs: one for users, the other internal for use by Brokers. All APIs are based at /druid/coordinator/v1/catalog. The user APIs are of two forms:

Note that all these APIs are for the metadata catalog entries; not the actual datasource. The table metadata can exist before the underlying datasource. Also, the definition can be deleted without deleting the datasource.

The primary catalog entry is the "table specification" (TableSpec) which holds the user-provided information about a table. Internally, Druid adds additional information (schema, name, create date, update date, state, etc.) to create the full "table metadata" (TableMetadata) stored in the metadata database. Users update the spec, the system maintains the full metadata.

Create or Update Table Specification

POST {base}/tables/{dbSchema}/{name}[?action=New|IfNew|Replace|Force][&version={n}]

Configuration-as-code API to create or update a table definition within the indicated schema. Payload is a TableSpec, defined below.

The schema must be the name of a valid writable Druid to which the user has write access. The valid schemas at present are druid (for datasources) and input (for input table definitions.) We may also support view.

The user must have write access to the underlying datasource, even if the datasource does not yet exist. (That is, the user requires the same permissions as they will require when they first ingest data into the datasource.) For input tables, the user must have the extended INPUT permission on the input source name.

Creates or replaces the table spec for the given table depending on the action:

In all cases, the operation uses the provided spec as is. See the PUT operation below for partial updates.

For the Replace, version updates the table only if it is at the given version number. See below for details.

Update Table Specification

PUT {base}/tables/{dbSchema}/{name}[?version={n}]

Incremental API to update an existing table within the indicated schema, and with the given name. Payload is a TableSpec, defined below. The schema must be as described for Create Table above.

The table spec can be partial and is merged with the existing spec. Merge rules are:

Columns are merged differently for different table types.

The API supports two "levels" of synchronization. By default, the new entry simply overwrites the existing entry. However, if version={n} is included, then the update occurs only if the update timestamp in the current metadata DB record matches that given in the REST call. Using a version provides a form of "optimistic locking": first read the definition, make a change and send the update using the update time from the read. Doing this prevents accidental overwrites.

Read Table

GET {base}/tables/{dbSchema}/{name}

Configuration-as-code API to read the table specification for the table given by a schema and table name. The user must have read access to the table. Returns a 404 (NOT FOUND) if the entry does not exist. Remember: the metadata entry exists independent of the datasource itself. The result is the TableSpec defined below.

Note that the above are defined so that if one does a POST to create a table, followed by a GET, one ends up with the same table spec that one started with.

Drop Table

DELETE {base}/tables/{dbSchema}/{name}[?ifExists=true|false]

Drop the catalog entry for the given table and schema. The schema must exist. The table must also exist, and the user must have write access to the underlying datasource.

The optional ifExists=true parameter provides SQL IF EXISTS semantics: no error is given if the table does not exist.

Read Full Table Metadata

GET {base}/schemas/{dbSchema}/table/{tableName}

Returns the full metadata for a table including system-maintained properties such as name, update time, table spec, and more. Use this form to obtain the update timestamp used for the version optimistic locking feature. The pattern is:

Reorder Column

Columns a SELECT * appear in the same order that they appear in the table spec. The PUT operation above cannot change the order for datasources. To change column order, use the moveColumn API:

POST {base}/tables/{dbSchema}/{name}/moveColumn

The payload is a JSON object of type ColumnOrder that is of the form:

{
   "column": "<name>",
   "where": "first|last|before|after",
   "anchor": "<name>"
}

A column can be moved to the start or end of the list. Or, it can be move to appear before or after some other column. The anchor is ignored for the first or last options.

The operation fails if either the column, or the anchor (if provided) do not exist (which may occur if another writer deleted the column in the meantime.) The columns refer to entires in the catalog schema. A column may exist in the datasource but not in the catalog. Such columns can't be referenced in this API.

Delete Column Metadata

A client may wish to remove a specific datasource column. The PUT operation above can't handle deletions, only additions (because addition is far more common.) Though this operation is primary for datasource, it works for input sources as well.

POST {base}/{base}/tables/{dbSchema}/{name}/dropColumn

The payload is a JSON list of strings that identifies the columns to be dropped.

Note that deleting a column means to remove the metadata entry for the column. This is not the same as hiding the column. This operation does not physically remove the column: if the column still exists in any segment, then the column will appear in the merged schema. Use this operation for the case that a metadata column entry was added by mistake, or if all instances of a previously-existing physical column have expired out of the segments.

Dropping a column does not drop the column from the hidden columns list. It is expected that, if a column is deleted, it would likely appear in the hidden columns list until all old segments with that column expire out of the system.

Hide (and Unhide) Datasource Columns

Datasources provide a "hide" operation for columns. Segments may contain columns which are no longer needed. To avoid the need to rewrite segments, the catalog can simply "hide" existing columns. The PUT operation can append new hidden columns. This operation is a bit simpler, and can "unhide" an already-hidden column. The hidden column normally will not appear in the list of columns in the table spec: the name usually references a column that exists in segments, but is not actually needed.

The Payload is a HideColumns object of the form:

{
  "hide": [ "a", "b", "c" ],
  "unhide": [ "d", "e" ]
}

List Schema Names

GET {base}/list/schemas/names

Retrieves the list of the names of schemas known to the catalog, which includes the same set of schemas in the INFORMATION_SCHEMA.SCHEMATA table. Note that, in the present version, the catalog does not support user-defined schemas.

The list is not filtered by permissions as Druid does not have schema-level permissions. All users can see all schemas (but not necessarily the contents of the schemas.)

List All Table Names

GET {base}/list/tables/names

Retrieves the list of the names all tables known to the catalog across all schemas. Only some schemas allow definitions, only definitions appear. This is not a list of actual datasources or system tables: only a list of definitions.

The list is filtered based on user permissions: the list will omit tables for which the user does not have read access.

List Tables Names in a Schema

GET {base}/schemas/{dbSchema}/names

Retrieves the list of the names of tables within the given schema. This list contains only those tables for which metadata entries appear, and is thus a subset of those returned by INFORMATION_SCHEMA.TABLES. The list contains only those tables for which the user has read access.

List Tables in a Schema

GET {base}/schemas/{dbSchema}/tables

Returns the list of all tables within the given schema for which the user has access. The return value is a list of the same objects returned from GET {base}/tables/{dbSchema}/{name}.

Flush Cache

POST {base}/flush

Causes the catalog to invalidate any caches. Available on both the Coordinator and the Broker. This API is required only if the catalog DB changes outside of Druid, and is primarily for testing.

Synchronize Table (Internal)

GET {base}/tables/{dbSchema}/{name}/sync

Retrieve the entry for a single table within the given schema as a TableSpec object. The user is assumed to be the Druid super-user. This API is primarily for use by the Broker node. Currently does the same as GET {base}/tables/{dbSchema}/{name}, but this is subject to change as this is an internal API.

Synchronize Schema (Internal)

Returns the list of all table metadata, as TableSpec objects, within the given schema. The user is assumed to be the Druid super-user. This API is primarily for use by the Broker node. Currently does the same as GET {base}/schemas/{dbSchema}/sync, but this is subject to change as this is an internal API.

GET {base}/schemas/{dbSchema}/tables

paul-rogers commented 2 years ago

Extended Table Functions

The present version of Druid uses a Calcite feature to specify an ingest input table:

INSERT INTO dst SELECT *
FROM TABLE(extern(
  '{
    "type": "inline",
    "data": "a,b,1\nc,d,2\n"
   }',
  '{
    "type": "csv",
    "columns": ["x","y","z"],
    "listDelimiter": null,
    "findColumnsFromHeader": false,
    "skipHeaderRows": 0
  }',
  '[
    {"name": "x", "type": "STRING"},
    {"name": "y", "type": "STRING"},
    {"name": "z", "type": "LONG"}
  ]'
))
PARTITIONED BY ALL TIME

As it turns out, SQL (and Calcite) allow the use of named parameters. We can rewrite the above as follows. Notice the name => value syntax:

INSERT INTO dst SELECT *
FROM TABLE(extern(
  inputSource => '{
    "type": "inline",
    "data": "a,b,1\nc,d,2\n"
  }',
  inputFormat => '{
    "type": "csv",
    "columns": ["x","y","z"],
    "listDelimiter": null,
    "findColumnsFromHeader": false,
    "skipHeaderRows": 0
  }',
  signature => '[
    {"name": "x", "type": "STRING"},
    {"name": "y", "type": "STRING"},
    {"name": "z", "type": "LONG"}
  ]'
))
PARTITIONED BY ALL TIME

The above is great, but can be a bit awkward: we have to encode JSON in SQL (which, when we send via the REST API, we encode again in JSON.) Let's how we can use SQL named parameters to streamline the syntax (and set ourselves up for the catalog.) SQL requires that parameter names be "simple identifiers": that is, no dots. So, we can't just say:

  "inputSource.type" => "inline"

Instead, we have to "flatten" the names. That is, define SQL names that, internally, we map the the JSON names. The mapping is just code, so we omit the details here. Suppose we do the mapping. We now have a different set of arguments, so we need a different function. For now, let's call it staged.

We also need a way to specify the input table schema. Here we borrow another bit of Calcite functionality, the EXTEND clause which was added for Apache Phoenix. We modify the syntax a bit to fit our needs. The result:

INSERT INTO dst SELECT *
FROM TABLE(staged(
  source => 'inline',
  data => 'a,b,1
c,d,2
',
  format => 'csv'
  ))
  (x VARCHAR NOT NULL, y VARCHAR NOT NULL, z BIGINT NOT NULL)
PARTITIONED BY ALL TIME

Notice how the keywords in the staged function arguments match the properties defined in the REST call in the prior section. That is not an accident. That sets us up to merge the two ideas in the next update.

Table "Templates"

Using the above, the catalog allows the definition of a template table. To motivate this, let's start with a complete input table:

{
  "type": "input",
  "properties": {
    "source": "local",
    "file": "wikipedia.csv",
    "format": "csv",
   },
   "columns": ...
}

The above can be run by referencing the name, say myWiki:

SELECT * FROM `input`.`myWiki`

Druid, however, never ingests the same data twice: we want to read different files each time. Say wiki-2015-06-01.csv one day, wiki-2016-06-02.csv the next day. So, we simply omit the file property above, converting the catalog entry to a template table:

{
  "type": "input",
  "properties": {
    "source": "local",
    "format": "csv",
   },
   "columns": ...
}

We have to parameterize the template to run it, using a table function with the same name as the input table:

SELECT * FROM (TABLE(`input`.myWiki`(file => `wiki-2016-06-02.csv`))

The result of the table function is a complete table, ready to run as if it was fully defined in the catalog.

Notice that how three pieces come together:

paul-rogers commented 2 years ago

Primary Partition

The catalog allows the user to define the ingest partitioning:

{
   "dbSchema": "druid",
   "name": "fiveMinDs",
   "spec": {
      "type": "datasource",
      "segmentGranularity": "PT5M"
   }
}

By doing so, the user can drop the PARTITIONED BY clause in the INSERT statement as shown above.

Supported values include:

Only the standard Druid values are supported: providing a non-standard interval will raise an error.

Secondary Partitioning (Clustering)

The multi-stage ingest engine allows the user to specify secondary partitioning, expressed as the CLUSTERED BY clause. The clause includes a list of cluster keys, each of which is an expression and an optional "sort sense" (ASC or DESC). The expression is typically a column name, but can be an expression.

The catalog models this with a list of JSON objects:

{
   "dbSchema": "druid",
   "name": "fiveMinDs",
   "spec": {
      "type": "datasource",
      "segmentGranularity": "PT5M",
      "clusterKeys": [
        {"column": "x"},
        {"column": "y", "desc": true} ]
   }
}

At present, the catalog supports only column names: additional work is needed in the SQL query layer to support expressions. (There is also some debate about whether the optimizer can correctly use expression columns, and so whether we actually need them.)

When this information is present in the catalog, the user can omit the CLUSTERED BY clause from an INSERT statement.

Target Segment Row Count

The multi-stage engine allows the user to specify the desired number of rows per output segment. This is presently done as a context setting. With the catalog, it can be specified in table metadata:

{
   "dbSchema": "druid",
   "name": "fiveMinDs",
   "spec": {
      "type": "datasource",
      "segmentGranularity": "PT5M",
      "targetSegmentRows": 4000000
   }
}

A user-provided context setting takes precedence. If unset, the value is the default set by the multi-stage engine, which is currently 3 million rows.

paul-rogers commented 2 years ago

Input Formats

This section describes Input format properties. It will make more sense if you read the following "Metadata structure" comment first. The joys of using issue comments for documentation...)

Every input table definition (or staged(.) function call) must include the format property, if the input source requires a format.

Property Type JSON Name Description
format VARCHAR type Input format type

CSV

Defines a CSV input format.

Property Type JSON Name Description
format VARCHAR type Must be'csv'
listDelimiter VARCHAR listDelimiter Delimiter for list values in a list. Defaults to CTRL-A
skipRows INTEGER skipHeaderRows Number of rows to skip

Only the format property is required: Druid provides defaults for the other values.

TODO: Need a way to escape special characters for the listDelimiter field.

When used in an input table, Druid does not yet support the ability to infer column names from the input file, so the findColumnsFromHeader is not supported here.

CSV requires a list of columns. Provide these in the columns section of the input table definition in the catalog, or using the extended function notation in SQL:

SELECT *
FROM TABLE(staged(
  source => 'inline',
  data => 'a,b,1
c,d,2
',
  format => 'csv'
  ))
  (x VARCHAR NOT NULL, y VARCHAR NOT NULL, z BIGINT NOT NULL)

Delimited Text

Defines a Delimited text input format as a generalization of the CSV format. Properties default to provide a TSV (tab-separated values) format.

Property Type JSON Name Description
format VARCHAR type Must be'tsv'
delimiter VARCHAR delimiter A custom delimiter for data values. Defaults to TAB
listDelimiter VARCHAR listDelimiter Delimiter for list values in a list. Defaults to CTRL-A
skipRows INTEGER skipHeaderRows Number of rows to skip

Usage and limitations are the same as for the CSV format above.

JSON

Defines a JSON input format.

Property Type JSON Name Description
format VARCHAR type Must be'json'
keepNulls BOOLEAN keepNulls Optional.

The catalog does not yet support the flattenSpec or featureSpec properties.

Although batch ingestion does not require a list of columns, the multi-stage engine does. Provide columns the same way as described for the CSV format above.

Generic

The generic input source is a direct representation of any arbitrary Druid input source, using the JSON representation of that source. The inputFormatSpec property holds the JSON-serialized form of the input spec.

paul-rogers commented 2 years ago

Metadata Structure

The catalog stores metadata in a generalized format designed to support a number of operations:

The general approach is to divide metadata into top-level objects. At present, only one object is available: tables. Others (connections, secrets, schedules) are envisioned for later. Within each object, there are one or more types. For tables, these are different kinds of datasources, different kinds of input tables, views, and so on. Each object has a set of properties, described as key/value pairs. Tables also have a set of columns.

The term specification (or spec) is used for the JSON object which the application writes into the catalog. The term metadata includes the spec, plus other general information such as a name, timestamp and so on.

Table Metadata

The table metadata object holds two kinds of information: the system-defined metadata about the entry, and the user-provided table specification (TableSpec). All tables, regardless of kind (datasource, input, view, etc.) use the same table metadata object: only the table specification part varies.

Example:

{
   "Id": {
     "schema":"druid",
     "name":"read"
   },
   "creationTime":1654634106432,
   "updateTime":1654634106432,
   "state":"ACTIVE",
   "spec": <TableSpec>
   }
}

Fields:

The user provides the id (schema and name) and the spec; the system maintains the other fields.

Table Specification (TableSpec)

The table specification holds the user-provided information for a catalog entry for the table. The structure of the spec is the same for all table types. The specific properties and columns depends on the type of table.

"spec": {
  "type": "<type>",
  "properties": { ... },
  "columns": [ ... ]
}

Fields:

Column Specification (ColumnSpec)

Columns are of one of three types, depending on the kind of datasource. For a detail (non-rollup) datasource, use the column type. For a roll-up table, use either dimension or measure. The general form is:

{
  "type": "<type>",
  "name": "<name>",
  "sqlType": "<type>",
  "properties": { ... }
}

Fields:

Datasource Table

The datasource table type defines ingestion properties. Many other properties can be added: the current set is the bare minimum to support MSQ ingestion. The catalog describes the columns within the datasource, but not how the columns are used. That is the idea of "rollup" is something that an application may choose to apply to a datasource: it is not a datasource type and the catalog maintains no information about dimensions, measures and aggregates.

Example:

{
   "type":"datasource",
   "properties": {
     "description": "<text>",
     "segmentGranularity": "<period>",
     "targetSegmentRows": <number>,
     "clusterKeys": [ { "column": "<name">, "desc": true|false } ... ],
     "hiddenColumns: [ "<name>" ... ]
   },
   "columns": [ ... ],
}

Properties:

Cluster keys are defined as a list of JSON objects:

"clusterKeys": [ {"column": "state"}, {"column": "population", "desc": true} ]

More properties will come as work proceeds.

The list of columns act as hints:

Datasource Column Specification

Datasource columns are provide a name and an optional SQL data type. The SQL type is optional. If not present, the physical type is used. If there are no segments yet, the SQL type defaults to VARCHAR. In general, if adding a column to the catalog before data exists, it is best practice to specify the column type.

If the __time column appears in the catalog, it must be a dimension in a roll-up table. Its type must be null or TIMESTAMP. Include the__time` column to indicate where it should appear in the table's schema.

External Tables

An external table is a table that resides outside of Druid. (That is, the table is not a datasource.) External tables are the sources used for ingestion using INSERT and SELECT statements. MSQ allows querying of external tables. There is a different external table type for each kind of external table: local file, HTTP, S3 bucket and so on.

An external table is equivalent to an EXTERN definition, but with the definition stored in the catalog. (See a note later about the extended table functions.) Druid defines the input source and format as JSON objects. The existing EXTERN function uses JSON strings to set these values. The catalog uses a different approach: a "flattened" list of names which Druid maps internally to the fields in the JSON structure. This approach allows us to mesh the input source specification with table function arguments.

External tables are typically parameterized, as Druid generally does not ingest the same data twice (except when testing.)

Many external tables support multiple formats, which appear as a format specification within the table specification, as described below. The format properties appear within those for the table. The reason for this will become clear when we discuss parameterized tables and SQL integration.

External tables typically require a schema (though upcoming MSQ enhancements may allow MSQ to infer the schema a runtime.) The schema is provided via the same column specifications as used for datasources, but with a column type of extern.

External tables are an abstraction on top of input sources. However, the catalog tends to represent input source properties somewhat differently than do the JSON objects. See below for details.

Column properties:

Columns must appear in the same order that they appear in the input file.

Example for an inline CSV external table:

{
  "type": "inline",
  "properties": {
    "format": "csv",
    "data": ["a,b,1", "c,d,2" ]
   },
   "columns": [
      {
         "name":"a",
         "sqlType":"varchar"
      },
     ...
   ]
}

Inline Table

The inline table type includes the data in the table definition itself and is primary useful for testing. See Inline input source. It has just one property in addition to the format:

Local Table

The local table type represents a file on the local file system, and is useful primarily for single-machine configurations. See Local input source. Provide a format. The table can be parameterized. Properties:

Provide either a pattern or a list of files. If both appear, (what happens?)

A local table has two parameters: filePattern and files. This means that a SQL statement can supply either the pattern, or a list of files per-query. Parameterization is explained in full elsewhere.

HTTP Table

The http table type allows reading data from an HTTP server. See HTTP input source. Supports formats. The HTTP table can be parameterized. Properties:

When used from SQL, set the template to the common part of the URIs: typically the part that includes the server for which the credentials are valid. Then, the parameter, uris, provides a comma-delimited list of the specific items to read. For example:

Again, parameterization is discussed elsewhere.

Input Formats

As noted above, most external tables allow multiple input formats as described in the Input format docs. The catalog form is, again, a bit different than the JSON form.

The set of formats described here is a "starter" set and will be expanded as the project proceeds.

Indicate the input format using the format property:

Then, include other properties as needed for the selected format. See the note above about the details of the supported formats. The set of formats described here is a "starter" set and will be expanded as the project proceeds.

clintropolis commented 2 years ago

Thanks for the additional details :+1:

sqlType: One of Druid's scalar data types: VARCHAR, FLOAT, DOUBLE or BIGINT. Case-insensitive.

How do you plan to support Druid's complex typed columns? (Such as the recently added COMPLEX<json> columns)? Complex types are currently case sensitive since they are registered internally in a map however they are defined (potentially via extensions), so it would take some work (and potentially be backwards incompatible to make them not be case sensitive).

The reason I'm asking is that i'm still a bit worried about how we are going to cleanly map this to Druids type system. Is it going to be a strict mapping, like exactly 1 SQL type to 1 Druid type? Or will it be permissive? (e.g. INTEGER, BIGINT, etc all just map to most appropriate Druid type, LONG in this case). I guess I wonder if we should also allow using a RowSignature or something here which is defined in Druid's native type system so that these schemas can model all possible schemas that can be created today (and the way internal via segment metadata schemas are currently built) as an alternative to defining types using SQL types since the native types also serialize into simple strings.

paul-rogers commented 2 years ago

@clint, thanks for the head's up on the complex types. Can you point me to documentation on the details of the type? To any SQL support we already have?

One question is whether a COMPLEX<json> column is logically one opaque "blob" (with whatever data appeared on input), or is a compound type where the user defines the fields.

If a JSON column is a blob, then we could look at the Drill MAP type: where a column foo is simply declared as type MAP, which then enables a set of operations and functions, just like any other type. Presumably we'd implement something like the Postgres JSON functions, which are based on the SQL standard.

If the user must declare the structure of a JSON object, then we do have a compound type. In that case, each column is, itself, a record, and can have a nested schema, to however many levels we choose to support. Experience with Drill showed that users ability to deal with schema is limited when it is one level, and rapidly falls to zero when schemas are nested: most of us just don't want to think that hard! Java (and Go, etc.) have language tools to work with nested records, SQL was designed for an earlier era and working with nested data is painful.

Regardless of how we proceed, we can use the Posgres JSON operators to access fields within a JSON blob, but that would require Calcite parser changes. On the other hand, Drill did coax Calcite into allowing references to MAP columns that look like table record references: myTable.myMap.myNestedMap.myValue. Regardless of syntax, these would translate internally into functions, maybe json_get(col, path) or some such. Perhaps you've already implemented these functions?

paul-rogers commented 2 years ago

@clintropolis, you also asked about SQL mapping. My suggestion is to enforce a limited set of types: VARCHAR, BIGINT, FLOAT and DOUBLE, which directly correspond to the Druid storage types. (Other types can be intermediate values.) This way, if Druid were ever to support a 1-byte integer value, we could use TINYINT (or BOOLEAN) to label that type. If we mapped TINYINT to long internally, then we'd have an ambiguous mess later on. We've already got the beginnings of ambiguity with TIMESTAMP: it has a meaning in SQL, but we just work with it as a long. SQL does require rigor in the type system to keep everything straight.

clintropolis commented 2 years ago

thanks for the head's up on the complex types. Can you point me to documentation on the details of the type? To any SQL support we already have?

(heh, I think you tagged the wrong person in your comments, sorry other @clint 😅 ). Nested data columns are described in proposal #12695 and PR #12753. They are wired up to SQL, though I'm mostly just using them as an example. Like all complex types is currently handled in a more or less opaque manner (functions which know how to deal with COMPLEX<json> do things with it, things that aren't aware do not). This was maybe not a great example because I'm considering making this stuff into top level native Druid types, though it would most likely be in the addition of both VARIANT and STRUCT (or MAP or something similar), since if it were done entirely with native types the current COMPLEX<json> is effectively whatever type it encounters (so might be normal scalar types LONG, STRING, etc; a VARIANT type; a nested type STRUCT, arrays of primitives ARRAY<LONG> etc; arrays of objects ARRAY<STRUCT>, nested arrays ARRAY<ARRAY<STRUCT>> and so on).

Complex types can be defined as dimensions or metrics, so we can't count on defining them all in terms of aggregators.

Internally, we currently build the SQL schemas for Calcite with DruidTable which represents the schema with a RowSignature which is defined using Druid native types which it collects from SegmentMetadata queries. Complex types are represented internally in Calcite with ComplexSqlType, whenever it is necessary to represent them as an actual SQL type, though this is a relatively new construct that isn't used everywhere yet (since many of our functions which have complex inputs and outputs that predate this construct at the calcite level will use the ANY and OTHER sql types and defer actual validation that it is the correct complex type until translated to native Druid query which can check against the native Druid types in the RowSignature of the table).

My suggestion is to enforce a limited set of types: VARCHAR, BIGINT, FLOAT and DOUBLE, which directly correspond to the Druid storage types.

This is my main point, these are not the only Druid storage types, the current proposal is only able to model a rather small subset of the types which can appear in Druid segments. The complex type system is extensible, meaning there is potentially a large set of complex types based on what set of extensions is loaded. Internally these are all basically opaque, which is why we have the generic COMPLEX<typeName> json representation of the native type, which we use to extract the typeName and can lookup the handlers for that type. Many of these types are tied to aggregators, but multiple aggregators can make the same type, and many aggregators support ingesting pre-existing (usually binary) format values. I think we need something generic like COMPLEX<typeName> that we use for the native types so that we can retain the typeName so that the functions can perform validation and provide meaningful error messages when using a COMPLEX<thetaSketch> input on a function that expects COMPLEX<HLLSketch> or whatever, and then in the native layer to choose the correct set of handlers for the type. Otherwise every single complex type will need to devise a way for the catalog to recognize it, which sounds like a lot of work for rather low utility.

There will also likely be ARRAY typed columns in the near future, so we'll need to be able to be sure we can model those as well, where I guess if it handles stuff like VARCHAR ARRAY it would be fine as currently proposed, though i've seen other ways of defining array types in the wild (looks at bigquery, though i used the same syntax for the native Druid type representation...) so i'm not sure how hard the standard is here.

paul-rogers commented 2 years ago

Based on advice from others, I've dropped the ideas around rollup tables: there will be no attempt to describe the aggregations for a rollup table. We'll leave that to the user to decide how to structure rollups.

paul-rogers commented 2 years ago

@clintropolis notes:

these are not the only Druid storage types, the current proposal is only able to model a rather small subset of the types which can appear in Druid segment

The intention is that a combination of the column spec and column type provides a description of all possible column types. Sorry if that was not clear: the focus in the aggregate section was on, well, aggregates. I just hadn't gotten far enough to need to deal with the others yet.

One constraint I want us to keep in mind is that we'd like to eventually allow DDL statements something like:

CREATE ROLLUP TABLE foo (
  __time TIMESTAMP,
 a IP_ADDRESS,
 b ARRAY(STRING),
 c SUM(LONG),
 d STRUCT(e STRING, f DOUBLE),
 g VARCHAR WITH COMPACT INDEX
)
PARTITION BY DAY
CLUSTER BY a, g

So, the type names have to be SQL-like and SQL-parsable.

With a bit more research on complex types, it sounds like we have three categories:

My proposal (since withdrawn until we rethink it) is:

There is no good answer for user-visible structures because those are not part of the SQL domain of discourse. There is an ill-fated project, SQL++ that tried to find a solution. Seems it was adopted by Apache Asterix and CouchBase.

In Drill, we handled the types outside of SQL by using (an earlier version of) an Arrow-like format. The current thinking is to adapt that pattern to be more SQL and Druid-like for use in the catalog, and in eventual SQL DDL statements. For example we could invent syntax such as STRUCT(a STRUCT(b BIGINT, c VARCHAR), d DOUBLE).

Array columns can be represented similarly: ARRAY(DOUBLE), say. FWIW, Arrow useslist<double>.

For the first catalog PR, the types are "to be named later": we're just focusing on storing the type names, whatever we decide they are. This gives us time to continue the type name discussion.

The catalog proposes using a different column "kind" for dimensions and measures. (Where "kind" is the Jackson type field in the JSON.) In this way, we know the difference between a complex dimension (such as IP_ADDRESS) and a complex measure (SUM(LONG)). If there are types that can be both a dimension and a measure (are both aggregates and not), then the column "kind" would disambiguate.

The kind, by the way, allows us to specify other per-column-kind information. For example, if there are multiple choices for an index type for dimensions, that would be a dimension-only attribute of a column as suggested in the DDL sketch above.

Anyway, the point is taken: we do need a full design for all column types. I'll work up something.

paul-rogers commented 2 years ago

Updated the proposal to remove the idea of a rollup table. That idea will come as a separate proposal later. The non-spec comments above preserve the discussion: the "spec" comments describe the updated design.

Since column type is now just the storage type, we can use the Druid names and optional SQL aliases. The type used in the catalog is the Druid type, converted to SQL syntax. That is, COMPLEX<FOO> would become COMPLEX(FOO). Complex types are defined in extensions (typically) each such extension can define a type alias, such as just FOO for the above example.