confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
87 stars 1.04k forks source link

In interactive mode, apply "ksql.streams.*" configuration if possible #2925

Open badaiaqrandista opened 5 years ago

badaiaqrandista commented 5 years ago

Sometime, after you have all your queries running in interactive mode, you want to modify the default configuration of the existing streams. For example to skip records with invalid timestamp by setting "default.timestamp.extractor" to "org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp".

Currently, to do this you have to drop all queries in KSQL cli, then execute "set 'default.timestamp.extractor'='org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp';", then resubmit all queries.

It would be great if KSQL server, in interactive mode, apply all "ksql.streams.*" configuration to override the configuration stored in command topic. This way, you don't have to drop all queries and resubmit them again.

Of course, not all configuration can be overridden, so KSQL should print in the startup log whether it applies or does not apply each "ksql.streams.*" entry.

vkodi commented 5 years ago

Hi @badaiaqrandista how did you set this set 'default.timestamp.extractor'='org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp'; when I set this I am getting this below ERROR:

Error issuing POST to KSQL server. path:ksql Caused by: com.fasterxml.jackson.databind.JsonMappingException: Failed to set 'default.timestamp.extractor' to 'class org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp' (through reference chain: io.confluent.ksql.rest.entity.KsqlRequest["streamsProperties"]) Caused by: Failed to set 'default.timestamp.extractor' to 'class org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp' (through reference chain: io.confluent.ksql.rest.entity.KsqlRequest["streamsProperties"]) Caused by: Failed to set 'default.timestamp.extractor' to 'class org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp' Caused by: Invalid value class org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp for configuration default.timestamp.extractor: Class class org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp could not be found.

I am running ksql in a docker container 5.1.0 version, your input is really appreciating