confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
127 stars 1.04k forks source link

Support window bounds columns on windowed group by #4397

Open big-andy-coates opened 4 years ago

big-andy-coates commented 4 years ago

KSQL currently lets you take a non-windowed stream and perform a windowed group by:

CREATE TABLE T as SELECT stuff FROM S WINDOW TUMBLING (SIZE 1 SECOND) group by something;

Which is essentially grouping by not just something, but also implicitly by the window bounds.

This might be more correctly written with a Tumbling table function:

CREATE TABLE T as SELECT stuff FROM Tumbling(S, SIZE 1 SECOND) group by something, windowstart, windowend;

Where the Tumbling table function returns one row for each row in S, with the addition of the windowstart and windowend columns. (Note: Hopping and session table functions are also possible, though in the case of the latter the table function would also emit retractions).

In a correct SQL model windowstart and windowend would therefore be available as fields within the selection, e.g.

CREATE TABLE T as SELECT windowstart, windowend, something, count() FROM Tumbling(S, SIZE 1 SECOND) group by something, windowstart, windowend;

This would allow us to do away with windowStart() and windowEnd() udafs!!!!!

Unfortunately, this is not currently possible. Using the window bounds columns results in an unknown column error.

Nor are the columns available to UDAFs, e.g.

QTT test:

{
      "name": "non-windowed stream source using window bounds columns",
      "statements": [
        "CREATE STREAM TEST (ROWKEY INT KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');",
        "CREATE TABLE S2 as SELECT count(1) as count, min(windowstart) as wstart, min(windowend) as wend FROM test WINDOW TUMBLING (SIZE 1 SECOND) group by ROWKEY;"
      ],
      "inputs": [
        {"topic": "test_topic", "key": 0, "value": {}, "timestamp": 10345},
        {"topic": "test_topic", "key": 0, "value": {}, "timestamp": 13251}
      ],
      "outputs": [
        {"topic": "S2", "key": 0,"value": {"COUNT":1}, "timestamp": 10345, "window": {"start": 10000, "end": 11000, "type": "time"}},
        {"topic": "S2", "key": 0,"value": {"COUNT":1}, "timestamp": 13251, "window": {"start": 13000, "end": 14000, "type": "time"}}
      ],
      "post": {
        "sources": [
          {
            "name": "S2",
            "type": "stream",
            "keyFormat": {"format": "KAFKA", "windowType": "HOPPING", "windowSize": 30000},
            "schema": "ROWKEY INT KEY, ID BIGINT, VALUE BIGINT, KEY INT"
          }
        ]
      }
    }

Results in error:


io.confluent.ksql.util.KsqlStatementException: Column 'TEST.WINDOWSTART' cannot be resolved.
Statement: CREATE TABLE S2 AS SELECT
  COUNT(1) COUNT,
  MIN(TEST.WINDOWSTART) WSTART,
  MIN(TEST.WINDOWEND) WEND
FROM TEST TEST
WINDOW TUMBLING ( SIZE 1 SECONDS ) 
GROUP BY TEST.ROWKEY
EMIT CHANGES
Statement: CREATE TABLE S2 as SELECT count(1) as count, min(windowstart) as wstart, min(windowend) as wend FROM test WINDOW TUMBLING (SIZE 1 SECOND) group by ROWKEY;

    at io.confluent.ksql.test.tools.TestExecutorUtil$PlannedStatementIterator.planStatement(TestExecutorUtil.java:407)
    at io.confluent.ksql.test.tools.TestExecutorUtil$PlannedStatementIterator.hasNext(TestExecutorUtil.java:352)
    at io.confluent.ksql.test.tools.TestExecutorUtil.execute(TestExecutorUtil.java:259)
    at io.confluent.ksql.test.tools.TestExecutorUtil.doBuildQueries(TestExecutorUtil.java:208)
    at io.confluent.ksql.test.tools.TestExecutorUtil.buildStreamsTopologyTestDrivers(TestExecutorUtil.java:89)
    at io.confluent.ksql.test.tools.TestExecutor.buildAndExecuteQuery(TestExecutor.java:134)
    at io.confluent.ksql.test.EndToEndEngineTestUtil.shouldBuildAndExecuteQuery(EndToEndEngineTestUtil.java:46)
    at io.confluent.ksql.test.QueryTranslationTest.shouldBuildAndExecuteQueries(QueryTranslationTest.java:83)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runners.Suite.runChild(Suite.java:128)
    at org.junit.runners.Suite.runChild(Suite.java:27)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: io.confluent.ksql.util.KsqlException: Column 'TEST.WINDOWSTART' cannot be resolved.
    at io.confluent.ksql.analyzer.ExpressionAnalyzer$SourceExtractor.getSource(ExpressionAnalyzer.java:88)
    at io.confluent.ksql.analyzer.ExpressionAnalyzer$SourceExtractor.visitQualifiedColumnReference(ExpressionAnalyzer.java:75)
    at io.confluent.ksql.analyzer.ExpressionAnalyzer$SourceExtractor.visitQualifiedColumnReference(ExpressionAnalyzer.java:52)
    at io.confluent.ksql.execution.expression.tree.QualifiedColumnReferenceExp.accept(QualifiedColumnReferenceExp.java:46)
    at io.confluent.ksql.execution.expression.tree.ExpressionVisitor.process(ExpressionVisitor.java:23)
    at io.confluent.ksql.execution.expression.tree.TraversalExpressionVisitor.visitFunctionCall(TraversalExpressionVisitor.java:96)
    at io.confluent.ksql.execution.expression.tree.TraversalExpressionVisitor.visitFunctionCall(TraversalExpressionVisitor.java:24)
    at io.confluent.ksql.execution.expression.tree.FunctionCall.accept(FunctionCall.java:58)
    at io.confluent.ksql.execution.expression.tree.ExpressionVisitor.process(ExpressionVisitor.java:23)
    at io.confluent.ksql.analyzer.ExpressionAnalyzer.analyzeExpression(ExpressionAnalyzer.java:48)
    at io.confluent.ksql.analyzer.Analyzer$Visitor.throwOnUnknownColumnReference(Analyzer.java:316)
    at io.confluent.ksql.analyzer.Analyzer$Visitor.visitQuery(Analyzer.java:305)
    at io.confluent.ksql.analyzer.Analyzer$Visitor.visitQuery(Analyzer.java:158)
    at io.confluent.ksql.parser.tree.Query.accept(Query.java:117)
    at io.confluent.ksql.parser.tree.AstVisitor.process(AstVisitor.java:33)
    at io.confluent.ksql.analyzer.Analyzer.analyze(Analyzer.java:148)
    at io.confluent.ksql.analyzer.QueryAnalyzer.analyze(QueryAnalyzer.java:81)
    at io.confluent.ksql.engine.QueryEngine.buildQueryLogicalPlan(QueryEngine.java:80)
    at io.confluent.ksql.engine.EngineExecutor.planQuery(EngineExecutor.java:197)
    at io.confluent.ksql.engine.EngineExecutor.plan(EngineExecutor.java:157)
    at io.confluent.ksql.engine.KsqlEngine.plan(KsqlEngine.java:176)
    at io.confluent.ksql.test.tools.TestExecutorUtil$PlannedStatementIterator.planStatement(TestExecutorUtil.java:397)
    ... 37 more
big-andy-coates commented 4 years ago

While it should be possible to make WINDOWSTART and WINDOWEND available within:

The following places can not so easily be supported:

big-andy-coates commented 4 years ago

Partial fix, (for non-aggregate select expressions), available in https://github.com/confluentinc/ksql/pull/4450

big-andy-coates commented 4 years ago

Removing 0.7 release tags as the remaining work is not needed for 0.7 and, likely, requires a Streams change.

vvcephei commented 4 years ago

To document the related Streams work:

Personally, I feel we first need to finish up:

So my personal plan is to finish up KIP-478 and then tackle KIP-300.

@big-andy-coates , do you think this captures the relevant needs, or do we need to create a different AK Jira issue?

big-andy-coates commented 4 years ago

Hey @vvcephei, I'm not sure to be honest.

KIP-300 seems to me to be fixing the limitation that its not possible to build a windowed table from a windowed changelog. So I don't see how that's related.

KAFKA-7777 also doesn't seem to be on-topic.

What's needed is:

  1. the ability to access a windowed key from within the Aggregator.apply call. The key passed is currently the unwindowed key. what is needed is the windowed key.

For example, KStreamSessionWindowAggregate actually creates the windowed key in the line after the aggregate call. But this sessionKey is exactly what we need to be passed to the aggregate call.

https://github.com/apache/kafka/blob/19681f6b95a5076af7020475f46d0bbe9f6015e1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L176-L177

The second thing we'd ideally need is a filter call available on KGroupedStream and its table equivalent. However, we can hack around this if we need to, though its not pretty.

It doesn't seem any of the listed items above address either of these items, but I may be missing something.

vvcephei commented 4 years ago

Aha! Thanks for the clarification, @big-andy-coates . That is indeed different than what I was thinking.

Do you want to create a Kafka Jira ticket to track this desire?

muneebshahid commented 4 years ago

So after the upgrade we aren't able to use the UDAfs as they have been removed, and also the the column names aren't accessible in GROUPBY, WHERE, or HAVING. Is there a workaround for this? Also seems like the fix won't be part of 0.11?

big-andy-coates commented 4 years ago

Hi @muneebshahid,

The UDAFs, when they existing, wouldn't have been usable in GROUP BY or WHERE, and I can't think of a use-case for using the old windowStart() and windowEnd() UDAFs in the HAVING clause, if that was supported.

Are you saying you were using the UDAFs in the HAVING clause and have no lost this functionality? Sorry if that is the case. Can you explain your use-case and provide example SQL so we can understand what you're trying to achieve?

Thanks.

muneebshahid commented 4 years ago

Hey @big-andy-coates thank you for the response. And yes we are using them in "HAVING" clause.

Our use case is that we are tracking changes in the last x minutes. For this Hopping window was the most suitable one. And since the messages mostly arrive in order, we are mainly interested in the very first window (the window that extends most in the past from the current moment).

For example, have a look at this script ts is the timestamp

SELECT
    TIMESTAMPTOSTRING(MAX(ts), 'HH:mm:ss') AS MTS,
    TIMESTAMPTOSTRING(WINDOWSTART(), 'HH:mm:ss') AS WS,
    TIMESTAMPTOSTRING(WINDOWEND(),'HH:mm:ss') AS WE,
    SUM(faults) as total
FROM
    my
WINDOW HOPPING (SIZE 10 SECONDS, ADVANCE BY 2 SECONDS)
GROUP BY machine

then these five windows are emitted for a message.

| MTS | WS | WE | Total 1 | 08:19:00 | 08:18:52 | 08:19:02 | 4 2 | 08:19:00 | 08:18:54 | 08:19:04 | 4 3 | 08:19:00 | 08:18:56 | 08:19:06 | 4 4 | 08:19:00 | 08:18:58 | 08:19:08 | 4 5 | 08:19:00 | 08:19:00 | 08:19:10 | 4

To retain just the first one we are using

HAVING
    WINDOWEND() <= MTS + 2*1000;

2*1000 to account for the window advance.

One way for us would be to handle this outside ksql, but that means a lot of useless messages will be consumed, particularly for larger window sizes which then need to be filtered out.

If you have some suggestions then please let me know. Thank you.

big-andy-coates commented 4 years ago

Thanks for details of the use-case @muneebshahid.

Sorry this functionality has been temporarily lost. I can appreciate this must be frustrating. Please be assured we are working towards reinstating such functionality with our work towards ksqlDB supporting structured keys.

guozhangwang commented 4 years ago

Hey @big-andy-coates , could you create an AK streams ticket for the needed changes so that we would not forget about it?

big-andy-coates commented 4 years ago

+1 from community post requiring HAVING support for window bounds: https://stackoverflow.com/questions/54231314/how-to-only-keep-the-latest-window-in-ksql

guzeloglusoner commented 3 years ago

Hi all,

We are experiencing the same restriction on Financial Services domain. Is there any update on the issue? Should we expect a support in short notice?

Thanks.

guzeloglusoner commented 3 years ago

+1 from community: https://stackoverflow.com/questions/60034151/ksql-hopping-window-any-way-to-get-only-one-record-in-response

Harish-Sridhar commented 1 year ago

Any update on when this feature will be added?

mgbfgit commented 1 year ago

Also looking for update on this feature.

suhas-satish commented 1 year ago

Not currently planned on short term road map

DikshaMunjal7 commented 3 months ago

Any update on when this feature will be added?