confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
122 stars 1.04k forks source link

Rollback from ksqldb version 6.1.0 to 5.5.2 throwing serialization exception for newly created streams with ksqldb-6.1.0 #7310

Open manpreet1992 opened 3 years ago

manpreet1992 commented 3 years ago

Describe the bug We are facing serialization exception while rolling back from version 6.1.0 to 5.5.2. Initially, we had installed ksqldb 5.5.2 with a stream named "stream2"(Avro stream). After that we upgraded it to 6.1.0 and created a new stream "S4"(Json stream). Both streams worked fine. Finally, we rolled back from 6.1.0 to 5.5.2 and started facing the serialization exception for the stream "S4".

To Reproduce Steps to reproduce the behavior, include:

  1. Install ksqldb version 5.5.2.
  2. Create avro stream "stream2" using the below command: CREATE STREAM stream2 WITH (kafka_topic='test2', value_format='AVRO');
  3. Upgrade to ksqldb version 6.1.0.
  4. Create json stream "S4" using the below command: CREATE STREAM s4 (c1 VARCHAR, c2 INTEGER) WITH (kafka_topic='c4', value_format='json');
  5. Rollback ksqldb to version 5.5.2

Actual behaviour Ksqldb going to crashloopbackoff.

  1. Error messages
    ERROR Failed to start KSQL (io.confluent.ksql.rest.server.KsqlServerMain:63)
    org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition _confluent-ksql-manpreetksql_command_topic-0 at offset 4. If needed, please seek past the record to continue consumption.
    Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.exc.MismatchedInputException: Missing required creator property 'options' (index 2)
    at [Source: (byte[])"{"statement":"CREATE STREAM S4 (C1 STRING, C2 INTEGER) WITH (KAFKA_TOPIC='c4', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');","originalProperties":{"ksql.extension.dir":"ext","ksql.streams.cache.max.bytes.buffering":"10000000","ksql.transient.prefix":"transient_","ksql.query.status.running.threshold.seconds":"300","ksql.streams.default.deserialization.exception.handler":"io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler","ksql.query.pull.enable.standby.reads":"false","ksql.persistence.de"[truncated 2587 bytes]; line: 1, column: 3054] (through reference chain: io.confluent.ksql.rest.server.computation.Command["plan"]->io.confluent.ksql.engine.KsqlPlanV1["ddlCommand"]->io.confluent.ksql.execution.ddl.commands.CreateStreamCommand["formats"]->io.confluent.ksql.execution.plan.Formats["options"])
    Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Missing required creator property 'options' (index 2)
    at [Source: (byte[])"{"statement":"CREATE STREAM S4 (C1 STRING, C2 INTEGER) WITH (KAFKA_TOPIC='c4', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');","originalProperties":{"ksql.extension.dir":"ext","ksql.streams.cache.max.bytes.buffering":"10000000","ksql.transient.prefix":"transient_","ksql.query.status.running.threshold.seconds":"300","ksql.streams.default.deserialization.exception.handler":"io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler","ksql.query.pull.enable.standby.reads":"false","ksql.persistence.de"[truncated 2587 bytes]; line: 1, column: 3054] (through reference chain: io.confluent.ksql.rest.server.computation.Command["plan"]->io.confluent.ksql.engine.KsqlPlanV1["ddlCommand"]->io.confluent.ksql.execution.ddl.commands.CreateStreamCommand["formats"]->io.confluent.ksql.execution.plan.Formats["options"])
  2. KSQL logs
    ERROR Failed to start KSQL (io.confluent.ksql.rest.server.KsqlServerMain:63)
    org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition _confluent-ksql-manpreetksql_command_topic-0 at offset 4. If needed, please seek past the record to continue consumption.
    Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.exc.MismatchedInputException: Missing required creator property 'options' (index 2)
    at [Source: (byte[])"{"statement":"CREATE STREAM S4 (C1 STRING, C2 INTEGER) WITH (KAFKA_TOPIC='c4', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');","originalProperties":{"ksql.extension.dir":"ext","ksql.streams.cache.max.bytes.buffering":"10000000","ksql.transient.prefix":"transient_","ksql.query.status.running.threshold.seconds":"300","ksql.streams.default.deserialization.exception.handler":"io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler","ksql.query.pull.enable.standby.reads":"false","ksql.persistence.de"[truncated 2587 bytes]; line: 1, column: 3054] (through reference chain: io.confluent.ksql.rest.server.computation.Command["plan"]->io.confluent.ksql.engine.KsqlPlanV1["ddlCommand"]->io.confluent.ksql.execution.ddl.commands.CreateStreamCommand["formats"]->io.confluent.ksql.execution.plan.Formats["options"])
    Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Missing required creator property 'options' (index 2)
    at [Source: (byte[])"{"statement":"CREATE STREAM S4 (C1 STRING, C2 INTEGER) WITH (KAFKA_TOPIC='c4', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');","originalProperties":{"ksql.extension.dir":"ext","ksql.streams.cache.max.bytes.buffering":"10000000","ksql.transient.prefix":"transient_","ksql.query.status.running.threshold.seconds":"300","ksql.streams.default.deserialization.exception.handler":"io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler","ksql.query.pull.enable.standby.reads":"false","ksql.persistence.de"[truncated 2587 bytes]; line: 1, column: 3054] (through reference chain: io.confluent.ksql.rest.server.computation.Command["plan"]->io.confluent.ksql.engine.KsqlPlanV1["ddlCommand"]->io.confluent.ksql.execution.ddl.commands.CreateStreamCommand["formats"]->io.confluent.ksql.execution.plan.Formats["options"])
        at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
        at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1459)
        at com.fasterxml.jackson.databind.deser.impl.PropertyValueBuffer._findMissing(PropertyValueBuffer.java:194)
        at com.fasterxml.jackson.databind.deser.impl.PropertyValueBuffer.getParameters(PropertyValueBuffer.java:160)
        at com.fasterxml.jackson.databind.deser.ValueInstantiator.createFromObjectWith(ValueInstantiator.java:229)
        at com.fasterxml.jackson.databind.deser.impl.PropertyBasedCreator.build(PropertyBasedCreator.java:198)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:488)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1292)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159)
        at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:530)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:528)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:417)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1292)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:194)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:161)
        at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:130)
        at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:97)
        at com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:254)
        at com.fasterxml.jackson.databind.deser.std.ReferenceTypeDeserializer.deserialize(ReferenceTypeDeserializer.java:186)
        at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:530)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:528)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:417)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1292)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:194)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:161)
        at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:130)
        at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:97)
        at com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:254)
        at com.fasterxml.jackson.databind.deser.std.ReferenceTypeDeserializer.deserialize(ReferenceTypeDeserializer.java:186)
        at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:530)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:528)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:417)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1292)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159)
        at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4218)
        at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3275)
        at io.confluent.ksql.rest.server.computation.InternalTopicSerdes$InternalTopicDeserializer.deserialize(InternalTopicSerdes.java:59)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at io.confluent.ksql.rest.server.CommandTopic.getRestoreCommands(CommandTopic.java:102)
        at io.confluent.ksql.rest.server.computation.CommandStore.getRestoreCommands(CommandStore.java:213)
        at io.confluent.ksql.rest.server.computation.CommandRunner.processPriorCommands(CommandRunner.java:155)
        at io.confluent.ksql.rest.server.KsqlRestApplication.initialize(KsqlRestApplication.java:342)
        at io.confluent.ksql.rest.server.KsqlRestApplication.startKsql(KsqlRestApplication.java:275)
        at io.confluent.ksql.rest.server.KsqlRestApplication.startAsync(KsqlRestApplication.java:260)
        at io.confluent.ksql.rest.server.ExecutableServer.startAsync(ExecutableServer.java:47)
        at io.confluent.ksql.rest.server.KsqlServerMain.tryStartApp(KsqlServerMain.java:75)
        at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:61)

    Additional context Please provide the details about the issue. If it is expected behaviour than is there any workaround to solve it? We tried two scenarios to overcome this problem. First, we dropped stream "s4" before rolling back to 5.5.2 but as the "_command_topic" topic had contained the "create stream s4 json" and "drop stream s4 json", we failed to do the rollback and stuck with the same serialization issue. Secondly, we deleted the "_command_topic" topic before rolling back. Though this workaround solved the issue and rollback was successful but we lost all our streams.

@apurvam

rodesai commented 3 years ago

@manpreet1992 unfortunately this type of rollback is not supported because the older ksql versions may not understand the query written out by the new version. In recent versions of ksqlDB, the server looks out for this condition (where there's a "newer" command) and puts the server into "degraded" mode, where the server is still usable and the queries up to that point in the command topic still run. If you absolutely need the same streams and tables, and you still have the old command topic contents around, you can simply rewrite the command topic, copying contents up to the entry that added "s4".