cashapp / pranadb

Apache License 2.0
614 stars 24 forks source link

New CREATE SOURCE syntax #243

Closed mightyguava closed 2 years ago

mightyguava commented 3 years ago

We'll need to extend columnselectors in CREATE SOURCE significantly to support rather complicated topics like ledger_events_2. I'd like to propose a slightly different syntax to make the final result slightly closer to SQL spec and hopefully easier to use.

The existing syntax:

create source raw_ledger_events(
    transaction_id varchar,
    customer_token varchar,
    amount bigint,
    currency_code int,
    occurred_at bigint,
    partition_key varchar,
    primary key (transaction_id)
) with (
    brokername = "testbroker",
    topicname = "ledger_events_2",
    headerencoding = "stringbytes",
    keyencoding = "stringbytes",
    valueencoding = "protobuf:squareup.cash.ledgering.LedgerEvent",
    columnselectors = (
        "v.transaction.token",
        "v.customer_token",
        "v.transaction.total_balance_impact.amount",
        "v.transaction.total_balance_impact.currency_code",
        "v.occurred_at",
        "k",
));

A few issues I have with it are, in no particular order:

  1. v., k., t., and h. prefixes for selecting value, key, timestamp, and header are arbitrary and unintuitive
  2. each column's selector is a string evaluated by a separate parser rather than part of the create source syntax
  3. columnselector is embedded as part of the topic configuration, when it should probably be standalone

Here's a draft syntax that tries to solve these problems:

create source raw_ledger_events(
    transaction_id varchar,
    customer_token varchar,
    amount bigint,
    currency_code int,
    occurred_at bigint,
    partition_key varchar,
    primary key (transaction_id)
) as select 
    transaction.token,
    customer_token,
    transaction.total_balance_impact.amount,
    transaction.total_balance_impact.currency_code,
    occurred_at,
    meta('key')
from kafka(
    brokername = "testbroker",
    topicname = "ledger_events_2",
    headerencoding = "stringbytes",
    keyencoding = "stringbytes",
    valueencoding = "protobuf:squareup.cash.ledgering.LedgerEvent"
);

This borrows from the CREATE TABLE <tablename> AS SELECT .... FROM <tablename> syntax, already used for CREATE MATERIALIZED VIEW, and common to most popular dialects like MySQL and PostgreSQL. In reference to the previous issues mentioned:

1. v., k., t., and h. prefixes for selecting value, key, timestamp, and header are arbitrary and unintuitive

The "value" (message body) is going to be the most commonly selected from. It is promoted to the top level here, allowing top-level fields to be selected without the v. prefix. Keys, headers, and message timestamp will be provided by the special meta function, using meta('key'), meta('headers'), and meta('timestamp'). Function name suggestions welcome.

On Cash, these meta fields are unlikely to ever get used. key is usually some arbitrary partition key (sometimes random/incoherent) that is always available in the message body. We don't use headers AFAIK. And finally the timestamp the message is usually unimportant, while the timestamp of the underlying triggering event is usually in the message body, like occurred_at above.

2. each column's selector is a string evaluated by a separate parser

Column selectors are now just projections in the SELECT statement, becoming part of the main grammar. This is important as now we can reuse SQL functions and other language features here. For the ledger_events_2 topic specifically, we can use the CASE statement, e.g.

create source raw_ledger_events(
    transaction_id varchar,
    customer_token varchar,
    event_type varchar,
    amount bigint,
    occurred_at bigint,
    partition_key varchar,
    primary key (transaction_id)
) as select 
    transaction.token,
    customer_token,
    which_one_of(transaction.type),
    case which_one_of(transaction.type)
        when "btc_payment" then transaction.total_balance_impact.amount
        when "p2p_payment" then transaction.total_balance_impact.amount
        when "stored_balance" then transaction.stored_balance.snapshot.amount
        else null
    end,
    occurred_at,
    meta('key')
from kafka(
    brokername = "testbroker",
    topicname = "ledger_events_2",
    headerencoding = "stringbytes",
    keyencoding = "stringbytes",
    valueencoding = "protobuf:squareup.cash.ledgering.LedgerEvent"
);

In the above, which_one_of is a function that returns the name of the populated proto one_of field. We use a standard SQL CASE statement to switch on the result of the which_one_of to get the amount. (This is just for demonstration purposes... I have no idea what the fields in the message actually mean).

We may have quite a lot of uses for CASE in Prana. It's limiting that it can only return a single column. We may want to extend it to support returning multiple columns, for example:

case which_one_of(transaction.type)
    when "btc_payment" then (transaction.total_balance_impact.amount, transaction.total_balance_impact.currency_code)
    when "p2p_payment" then (transaction.total_balance_impact.amount, transaction.total_balance_impact.currency_code)
    when "stored_balance" then (transaction.total_balance_impact.amount, transaction.total_balance_impact.currency_code)
    else null, null
end

Aside: this gets quite wordy, so it might be useful in the future to support defining temporary variables or multiple select pipeline stages, like def x = transaction.total_balance_impact, or jq-like filters.

3. columnselector is embedded as part of the topic configuration

Topic configuration now goes after the FROM keyword, as the kafka function, which returns a "virtual table" that is the source of the data. This feels cleaner, and we can have other functions for defining different data sources orthogonal to the SELECT syntax... though SQL functions aren't supposed to have a named argument syntax, so that needs some rethinking.

alecthomas commented 3 years ago
  1. v., k., t., and h. prefixes for selecting value, key, timestamp, and header are arbitrary and unintuitive

I agree very much with this.

  1. each column's selector is a string evaluated by a separate parser

This specific issue could be solved by pulling that sub-parser into the CREATE SOURCE parser directly eg.

    columnselectors = (
        v.transaction.token,
        v.customer_token,
        v.transaction.total_balance_impact.amount,
        v.transaction.total_balance_impact.currency_code,
        v.occurred_at,
        k
    );

We may have quite a lot of uses for CASE in Prana. It's limiting that it can only return a single column.

I think this is a good point, particularly with oneofs as you've illustrated.


A few additional data points to consider:

  1. Participle is powerful enough to parse SQL, but writing a full SQL SELECT parser would be a significant amount of work, though I suspect that might not be necessary or even desirable for this use case?
  2. Making Participle delegate to the TiDB parser between SELECT and FROM is possible and may be preferable?
alecthomas commented 3 years ago

I do wonder if a more domain specific language might be better than a normal SELECT, for the reasons you've stated, but I don't know what that would be.

mightyguava commented 3 years ago

One issue I noticed while merging the parsers is that SQL is case insensitive, while the selector is necessarily case sensitive.

chadhq commented 3 years ago

LGTM. For what it's worth, CashEventing does implement the CloudEvent idea so a few header fields are populated. Some teams extract these fields from the payload, or let them default to auto generated values 😬 . All that said, I can't really imagine writing a prana query based on the headers that we have.

mightyguava commented 3 years ago

Items 1 and 2 are complete. I'm going to leave 3 alone for now, leaving this ticket open.

purplefox commented 2 years ago

Closing this, as I don't think there's a strong case for making major changes to the create source syntax at this point.