confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
39 stars 1.04k forks source link

Table-table join fails to create table with key and hangs on select. Unclear how to solve. #1710

Open codestoned1 opened 5 years ago

codestoned1 commented 5 years ago

Hi guys, I'm running into quite an irritating problem. I would be really grateful if you could help me. I am attempting to join two windowed tables (which both have rowkeys) in one table to calculate the clickthrough rate for certain pages. When I run the join as a simple select it succeeds and displays the data. However, when I try to persist this data by using create table table_name as select... and attempt to select from the created table, it simply hangs and returns no data. I believe this is because no key is being set for the created table. Here are the queries I ran, in order.

CREATE STREAM event_and_page_name_where_outclicked AS SELECT data->context['pageName'] AS page_name FROM data_stream WHERE data->event['eventName']='outclick';

CREATE STREAM event_and_page_name_where_viewed AS SELECT data->context['pageName'] AS page_name FROM data_stream WHERE data->event['eventName']='page';

CREATE TABLE user_clicks_per_10_seconds WITH (KAFKA_TOPIC='user_clicks_per_10_seconds',VALUE_FORMAT='avro') AS SELECT page_name, count(*) AS count FROM event_and_page_name_where_outclicked WINDOW TUMBLING (size 10 seconds) GROUP BY page_name;

CREATE TABLE user_views_per_10_seconds WITH (KAFKA_TOPIC='user_views_per_10_seconds',VALUE_FORMAT='json') AS SELECT page_name, count(*) AS count FROM event_and_page_name_where_viewed WINDOW TUMBLING (size 10 seconds) GROUP BY page_name;

CREATE TABLE clickthrough_rate_per_page AS SELECT *, cast(o.count as double) / cast(v.count as double) FROM user_clicks_per_10_seconds o LEFT JOIN user_views_per_10_seconds v ON o.rowkey=v.rowkey

In addition, I do not see a way to use group by here as it requires an aggregation, and I simply want to compute the number of outclicks over the number of views over a specific period of time. Any ideas what could be wrong here? I can post more information if need be.

Logs from KSQL: These errors are repeated many times.

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: io.confluent.ksql.serde.json.KsqlJsonSerializer) is not compatible to the actual key or value type (key type: org.apache.kafka.streams.kstream.Windowed / value type: io.confluent.ksql.GenericRow). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:106)
    at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:83)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:103)
    at org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:81)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoin$KTableKTableJoinProcessor.process(KTableKTableInnerJoin.java:110)
    at org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoin$KTableKTableJoinProcessor.process(KTableKTableInnerJoin.java:66)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
    at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
    at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:284)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:239)
    ... 12 more
Caused by: java.lang.ClassCastException
Caused by: java.lang.ClassCastException
Exception in thread "_confluent-ksql-1234567query_CTAS_JS_CLICKS_AND_VIEWS_96-d5918170-b8c1-4833-93b6-2212d1563836-StreamThread-1348" org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000004
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:198)
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:406)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:380)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368)
    at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
    at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
codestoned1 commented 5 years ago

I believe the main problem here is assigning a key to the incoming messages to the clickthrough_rate_per_page table. I don't see a way to assign the key I want to each message (I only need the key to contain the page_name and window start time, which should be stored in the rowtime).

apurvam commented 5 years ago

To be clear, when you do SELECT *, cast(o.count as double) / cast(v.count as double) FROM user_clicks_per_10_seconds o LEFT JOIN user_views_per_10_seconds v ON o.rowkey=v.rowkey you get results?

apurvam commented 5 years ago

I think the underlying problem is that the messages which are output from the windowed aggregates have binary keys, which can't be used downstream. See https://docs.confluent.io/current/ksql/docs/syntax-reference.html#key-requirements for details.

I am not sure how the plain SELECT * works.

codestoned1 commented 5 years ago

@apurvam yep I get results when just using the select.

codestoned1 commented 5 years ago

That seems like odd, kind of buggy behavior to reject a key that is created from KSQL statements through a supported feature (table-table joins). Also, I should point out that when running describe extended clickthrough_rate_per_page it shows that there is actually no key field for the table, which leads me to believe it's not being assigned a key at all due to the join?

apurvam commented 5 years ago

@zlex7 the lack of key for the output of the join is a bug and is being fixed by #1697

However, I really don't think you are hitting that here, though you could try building from that branch and seeing if it fixes your problem.

rodesai commented 5 years ago

This is a different bug. We're not setting the serde right for the output when the tables in the join are windowed. Its being set to StringSerde, but the key is of type Windowed.

apurvam commented 5 years ago

That makes sense @rodesai . The reason why the select * worked is because we are not serializing the output to a topic. Marking this as a bug.

big-andy-coates commented 4 years ago

related to https://github.com/confluentinc/ksql/issues/4396?