confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
116 stars 1.04k forks source link

PARTITION BY with table function causes logical and physical schemas to diverge #7215

Open michalstutzmann opened 3 years ago

michalstutzmann commented 3 years ago

Describe the bug Partitioning by an expression that evaluates to a table (a table function) cases logical and physical schemas to diverge.

To Reproduce Version: 0.15.0

Code to reproduce:

CREATE STREAM SOURCE (
    ELEMENTS ARRAY<STRING>
) WITH (
    KAFKA_TOPIC = 'source',
    KEY_FORMAT = 'KAFKA',
    VALUE_FORMAT = 'JSON'
);

CREATE STREAM DERIVED WITH (
    KAFKA_TOPIC ='derived'
) AS
SELECT EXPLODE(ELEMENTS) AS KEY_COLUMN,
               'value' AS VALUE_COLUMN
FROM SOURCE
PARTITION BY EXPLODE(ELEMENTS)
EMIT CHANGES;

Expected behavior

Input:

{
  "inputs": [
    {
      "topic": "source",
      "timestamp": 0,
      "key": null,
      "value": {"ELEMENTS": ["a"]}
    }
  ]
}

Output:

{
  "outputs": [
    {
      "topic": "derived",
      "timestamp": 0,
      "key": "a",
      "value": {"VALUE_COLUMN": "value"}
    }
  ]
}

Actual behaviour

        >>>>> Test failed: Logical and Physical schemas do not match!
Logical : `KEY_COLUMN` STRING, `VALUE_COLUMN` STRING
Physical: `KSQL_COL_0` STRING KEY, `KEY_COLUMN` STRING, `VALUE_COLUMN` STRING
Statement: CREATE STREAM DERIVED WITH (KAFKA_TOPIC='derived') AS SELECT
  EXPLODE(SOURCE.ELEMENTS) KEY_COLUMN,
  'value' VALUE_COLUMN
FROM SOURCE SOURCE
PARTITION BY EXPLODE(SOURCE.ELEMENTS)
EMIT CHANGES

Additional context Partitioning by a non-table expression works as expected.

colinhicks commented 3 years ago

Table functions like EXPLODE are supported only within the SELECT clause. We could improve the error message here to add clarity.

michalstutzmann commented 3 years ago

PARTITION BY in the above example is used to partition the output of the query. Is there any other way to achieve the same result if a table function is used for the partitioning column in the SELECT clause?

A related question:

The docs say: "The PARTITION BY clause, if supplied, is applied to the source after any JOIN or WHERE clauses, and before the SELECT clause, in much the same way as GROUP BY". Is there a reason why this cannot be done after the SELECT clause?

colinhicks commented 3 years ago

Is there any other way to achieve the same result if a table function is used for the partitioning column in the SELECT clause?

Yes, you can create another stream (a third stream in your example above) that is partitioned by the column that represents the result of the EXPLODE function.

The docs say: "The PARTITION BY clause, if supplied, is applied to the source after any JOIN or WHERE clauses, and before the SELECT clause, in much the same way as GROUP BY". Is there a reason why this cannot be done after the SELECT clause

The partitioning operation applies to the schema of the source stream. The select clause represents the schema of the result, so the PARTITION BY clause must be applied before SELECT.

This order of operations corresponds to the SQL standard for GROUP BY. There are a few more details in the relevant PR: https://github.com/confluentinc/ksql/pull/3982

michalstutzmann commented 3 years ago

Thanks for the elaborate answer @colinhicks and the links to relevant PR/discussions.

Yes, you can create another stream (a third stream in your example above) that is partitioned by the column that represents the result of the EXPLODE function.

Yes, this is a workaround, however creating an additional stream has quite significant resource usage impact (at least one additional topic + data transfer to/from Kafka). Are there any plans to support table function expressions in PARTITION BY in future versions?

archy-bold commented 3 years ago

I agree with @michalstutzmann. It seems odd that we can't specify a key column when using a table column such as EXPLODE. This just results in nulled key columns and a need to create a new topic just to set a key field.