confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
128 stars 1.04k forks source link

Default timestamp extractor cannot be replaced #9093

Open thecrazymonkey opened 2 years ago

thecrazymonkey commented 2 years ago

Describe the bug Trying to use a different timestamp extractor to cater for data where the TS may be missing so tried to replace it in the config:

ksql.streams.default.timestamp.extractor=org.apache.kafka.streams.processor.UsePartitionTimeOnInvalidTimestamp

however when the parser runs I get the default one instead :

[_confluent-ksql-default_query_CTAS_CLS_AGG_TABLE_53-d0e11f01-20ca-4e0f-b9c1-7e145a2ad4a2-StreamThread-1] WARN org.apache.kafka.streams.processor.internals.RecordQueue - stream-thread [_confluent-ksql-default_query_CTAS_CLS_AGG_TABLE_53-d0e11f01-20ca-4e0f-b9c1-7e145a2ad4a2-StreamThread-1] task [0_0] Skipping record due to negative extracted timestamp. topic=[metrics] partition=[0] offset=[141] extractedTimestamp=[-1] extractor=[io.confluent.ksql.execution.streams.timestamp.LoggingTimestampExtractor]

also tried :

`default.timestamp.extractor=org.apache.kafka.streams.processor.UsePartitionTimeOnInvalidTimestamp

it actually complains if i misconfigure the class name but does not seem to pick up the new value.
` To Reproduce

Steps to reproduce the behavior, include:

Server is CLI v7.1.1, Server v7.1.1

Sample record: {"cf_app_name":"_mst0__10","cf_org_name":"org"}

Used statement: CREATE or REPLACE STREAM CLS_METRICS (CF_APP_NAME STRING, CF_ORG_NAME STRING, TIMESTAMP BIGINT) WITH (KAFKA_TOPIC='metrics', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON', TIMESTAMP='timestamp');

Expected behavior

With the replaced TS extractor I'd expect the TS populated and record processed.

Actual behaviour

Instead, I'm getting message showing the wrong extractor used and record is skipped. See log above.

Would be great if the extractor can be configured per statement using the WITH clause.

mjsax commented 2 years ago

Seems ksqlDB uses a hard-coded TS-extractor and overwrite the config set by the user.

Given that ksqlDB wants some control over the TS-extractor (for example for logging purpose, think "processing log"), we should not change this behavior and should not support setting the streams config (maybe we need to document this).

Instead, we could extend the WITH clause with a new property to enable the "user partition-time as surrogate TS in case the record does not contain a valid TS" to allow people to get the same behavior as the KS TS-extractor UsePartitionTimeOnInvalidTimestamp. So we extend the WITH clause plus enhance the ksqlDB TS-extractor accordingly to implement this feature.