confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
119 stars 1.04k forks source link

ALTER or 'CREATE OR REPLACE' syntax does not support adding a field to the existing struct field of the stream #7930

Open kumar-subashp opened 3 years ago

kumar-subashp commented 3 years ago

Describe the bug The ALTER or 'CREATE OR REPLACE' syntax does not support adding a field to the existing struct field of the stream.

To Reproduce Steps to reproduce the behavior, include:

  1. The versions of KSQL. :

ksqldb-server: 0.17.0, 0.19.0, and 0.29.0 ksqldb-cli: 0.17.0, 0.19.0, and 0.29.0

  1. SQL statements :

CREATE STREAM TRANSACTION_STREAM ( id VARCHAR, transaction STRUCT<num_shares INT, amount DOUBLE>) WITH (KAFKA_TOPIC='POC.FINANCIAL.TXNS.DEV', VALUE_FORMAT='JSON', PARTITIONS=1);

trying to add TXN_TS field under transaction struct field,

CREATE OR REPLACE STREAM TRANSACTION_STREAM ( id VARCHAR, transaction STRUCT<NUM_SHARES INTEGER, AMOUNT DOUBLE, TXN_TS VARCHAR

) WITH (KAFKA_TOPIC='POC.FINANCIAL.TXNS.DEV', VALUE_FORMAT='JSON');

Expected behavior Should have support of adding a field to struct

Actual behaviour A clear and concise description of what actually happens, including:

  1. CLI output Cannot upgrade data source: DataSource 'TRANSACTION_STREAM' has schema = ID STRING, TRANSACTION STRUCT<NUM_SHARES INTEGER, AMOUNT DOUBLE> which is not upgradeable to ID STRING, TRANSACTION STRUCT<NUM_SHARES INTEGER, AMOUNT DOUBLE, TXN_TS STRING>. (The following columns are changed, missing or reordered: [TRANSACTION STRUCT<NUM_SHARES INTEGER, AMOUNT DOUBLE>])

  2. KSQL server logs

ksqldb-server | [2021-07-31 08:17:52,707] INFO Processed unsuccessfully: KsqlRequest{ksql='CREATE OR REPLACE STREAM TRANSACTION_STREAM ( ksqldb-server | id VARCHAR, ksqldb-server | transaction STRUCT<NUM_SHARES INTEGER, ksqldb-server | AMOUNT DOUBLE, ksqldb-server | TXN_TS VARCHAR ksqldb-server | >) ksqldb-server | WITH (KAFKA_TOPIC='POC.FINANCIAL.TXNS.DEV', ksqldb-server | VALUE_FORMAT='JSON', ksqldb-server | PARTITIONS=1);', configOverrides={}, requestProperties={}, sessionVariables={}, commandSequenceNumber=Optional[2]}, reason: (io.confluent.ksql.rest.server.resources.KsqlResource:325) ksqldb-server | io.confluent.ksql.util.KsqlStatementException: Cannot upgrade data source: DataSource 'TRANSACTION_STREAM' has schema = ID STRING, TRANSACTION STRUCT<NUM_SHARES INTEGER, AMOUNT DOUBLE> which is not upgradeable to ID STRING, TRANSACTION STRUCT<NUM_SHARES INTEGER, AMOUNT DOUBLE, TXN_TS STRING>. (The following columns are changed, missing or reordered: [TRANSACTION STRUCT<NUM_SHARES INTEGER, AMOUNT DOUBLE>]) ksqldb-server | Statement: CREATE OR REPLACE STREAM TRANSACTION_STREAM (ID STRING, TRANSACTION STRUCT<NUM_SHARES INTEGER, AMOUNT DOUBLE, TXN_TS STRING>) WITH (KAFKA_TOPIC='POC.FINANCIAL.TXNS.DEV', KEY_FORMAT='KAFKA', PARTITIONS=1, VALUE_FORMAT='JSON'); ksqldb-server | at io.confluent.ksql.engine.EngineExecutor.executeDdl(EngineExecutor.java:531) ksqldb-server | at io.confluent.ksql.engine.EngineExecutor.lambda$execute$0(EngineExecutor.java:126) ksqldb-server | at java.base/java.util.Optional.map(Optional.java:265) ksqldb-server | at io.confluent.ksql.engine.EngineExecutor.execute(EngineExecutor.java:126) ksqldb-server | at io.confluent.ksql.engine.SandboxedExecutionContext.execute(SandboxedExecutionContext.java:135) ksqldb-server | at io.confluent.ksql.rest.server.computation.ValidatedCommandFactory.createForPlannedQuery(ValidatedCommandFactory.java:139) ksqldb-server | at io.confluent.ksql.rest.server.computation.ValidatedCommandFactory.createCommand(ValidatedCommandFactory.java:106) ksqldb-server | at io.confluent.ksql.rest.server.computation.ValidatedCommandFactory.create(ValidatedCommandFactory.java:66) ksqldb-server | at io.confluent.ksql.rest.server.validation.RequestValidator.validate(RequestValidator.java:164) ksqldb-server | at io.confluent.ksql.rest.server.validation.RequestValidator.validate(RequestValidator.java:130) ksqldb-server | at io.confluent.ksql.rest.server.resources.KsqlResource.handleKsqlStatements(KsqlResource.java:281) ksqldb-server | at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeKsqlRequest$2(KsqlServerEndpoints.java:191) ksqldb-server | at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOldApiEndpointOnWorker$23(KsqlServerEndpoints.java:342) ksqldb-server | at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOnWorker$22(KsqlServerEndpoints.java:328) ksqldb-server | at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:313) ksqldb-server | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ksqldb-server | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ksqldb-server | at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ksqldb-server | at java.base/java.lang.Thread.run(Thread.java:829) ksqldb-server | Caused by: io.confluent.ksql.util.KsqlException: Cannot upgrade data source: DataSource 'TRANSACTION_STREAM' has schema = ID STRING, TRANSACTION STRUCT<NUM_SHARES INTEGER, AMOUNT DOUBLE> which is not upgradeable to ID STRING, TRANSACTION STRUCT<NUM_SHARES INTEGER, AMOUNT DOUBLE, TXN_TS STRING>. (The following columns are changed, missing or reordered: [TRANSACTION STRUCT<NUM_SHARES INTEGER, AMOUNT DOUBLE>]) ksqldb-server | at io.confluent.ksql.metastore.MetaStoreImpl.lambda$putSource$3(MetaStoreImpl.java:103) ksqldb-server | at java.base/java.util.Optional.ifPresent(Optional.java:183) ksqldb-server | at io.confluent.ksql.metastore.MetaStoreImpl.putSource(MetaStoreImpl.java:102) ksqldb-server | at io.confluent.ksql.ddl.commands.DdlCommandExec$Executor.executeCreateStream(DdlCommandExec.java:104) ksqldb-server | at io.confluent.ksql.execution.ddl.commands.CreateStreamCommand.execute(CreateStreamCommand.java:55) ksqldb-server | at io.confluent.ksql.execution.ddl.commands.Executor.execute(Executor.java:20) ksqldb-server | at io.confluent.ksql.ddl.commands.DdlCommandExec.execute(DdlCommandExec.java:64) ksqldb-server | at io.confluent.ksql.engine.EngineContext.executeDdl(EngineContext.java:227) ksqldb-server | at io.confluent.ksql.engine.EngineExecutor.executeDdl(EngineExecutor.java:527) ksqldb-server | ... 18 more

kumar-subashp commented 2 years ago

Any updates on this, please

kumar-subashp commented 2 years ago

Hi Team, I'm eagerly waiting for this fix, which will help us to alter the existing streams along with can retain the same Consumer Group. so requesting you to prioritize this issue.

kumar-subashp commented 1 year ago

I did check version 0.29.0, but it seems that this feature has not been taken into consideration. Please let me know if you have any alternatives to handle this scenario.

akosasante commented 1 year ago

I'm also interested in knowing what the expected workarounds or general guidance is for this use case. For now I'm just passing the data as a JSON string (VARCHAR) but that means that it's up to the consumer to re-encode it as JSON on their end; which doesn't work for situations where this ksql stream isn't the only thing publishing to that topic :cry: