confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
128 stars 1.04k forks source link

KsqlDB headless deployment throws error when creating a view from a rekeyed stream about ROWTIME being defined in the schema (ROWTIME is not defined in schema at all). CLI does not. #9489

Open pri-naik5 opened 2 years ago

pri-naik5 commented 2 years ago

Describe the bug KsqlDB headless deployment throws an unrelated error about 'ROWTIME' being used as a column name as it is reserved, when creating a table based off of a rekeyed stream. There is no ROWTIME anywhere in the stream topic schema or in the table. Table creation works totally fine when run on the CLI.

To Reproduce Steps to reproduce the behavior, include:

  1. The version of KSQL. - 0.27.2, 0.26.0
  2. Sample source data : The queries.sql file:
CREATE STREAM test_stream (id String, field1 String, field2 String)
WITH (KAFKA_TOPIC='test_stream', VALUE_FORMAT='avro', partitions=1);

INSERT INTO test_stream (id, field1, field2) VALUES ('1', 'test_value1', 'test1_field2_value');
INSERT INTO test_stream (id, field1, field2) VALUES ('2', 'test_value2', 'test2_field2_value');

CREATE OR REPLACE STREAM test_stream_rekeyed
WITH (KAFKA_TOPIC='test_stream_rekeyed', VALUE_FORMAT='avro', KEY_FORMAT='avro')
AS SELECT * FROM test_stream PARTITION BY id;

CREATE TABLE test_table (id String primary key)
WITH (KAFKA_TOPIC='test_stream_rekeyed', VALUE_FORMAT='avro', KEY_FORMAT='avro');
  1. Any SQL statements you ran

Expected behavior I expect the table/view to be created off of the rekeyed stream from the headless app

Actual behaviour A clear and concise description of what actually happens, including:

  1. CLI output

    ksql> CREATE STREAM test_stream (id String, field1 String, field2 String)
    >WITH (KAFKA_TOPIC='test_stream', VALUE_FORMAT='avro', partitions=1);
    
    Message        
    ----------------
    Stream created 
    ----------------
    ksql> INSERT INTO test_stream (id, field1, field2) VALUES ('1', 'test_value1', 'test1_field2_value');
    ksql> INSERT INTO test_stream (id, field1, field2) VALUES ('2', 'test_value2', 'test2_field2_value');
    ksql> CREATE OR REPLACE STREAM test_stream_rekeyed
    >WITH (KAFKA_TOPIC='test_stream_rekeyed', VALUE_FORMAT='avro', KEY_FORMAT='avro')
    >AS SELECT * FROM test_stream PARTITION BY id;
    
    Message                                            
    ----------------------------------------------------
    Created query with ID CSAS_TEST_STREAM_REKEYED_371 
    ----------------------------------------------------
    ksql> CREATE TABLE test_table (id String primary key)
    >WITH (KAFKA_TOPIC='test_stream_rekeyed', VALUE_FORMAT='avro', KEY_FORMAT='avro');
    
    Message       
    ---------------
    Table created 
    ---------------
  2. Error messages 'ROWTIME' is a reserved column name. You cannot use it as a name for a column.

  3. KSQL logs

    reason:  (io.confluent.ksql.rest.server.resources.KsqlResource:341)
    'ROWTIME' is a reserved column name. You cannot use it as a name for a column.
    Statement: <retracted>
    at io.confluent.ksql.engine.EngineExecutor.plan(EngineExecutor.java:700)
    at io.confluent.ksql.engine.SandboxedExecutionContext.plan(SandboxedExecutionContext.java:158)
    at io.confluent.ksql.rest.server.computation.ValidatedCommandFactory.createForPlannedQuery(ValidatedCommandFactory.java:216)
    at io.confluent.ksql.rest.server.computation.ValidatedCommandFactory.createCommand(ValidatedCommandFactory.java:130)
    at io.confluent.ksql.rest.server.computation.ValidatedCommandFactory.create(ValidatedCommandFactory.java:85)
    at io.confluent.ksql.rest.server.validation.RequestValidator.validate(RequestValidator.java:172)
    at io.confluent.ksql.rest.server.validation.RequestValidator.validate(RequestValidator.java:128)
    at io.confluent.ksql.rest.server.resources.KsqlResource.handleKsqlStatements(KsqlResource.java:297)
    at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeKsqlRequest$2(KsqlServerEndpoints.java:183)
    at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOldApiEndpointOnWorker$23(KsqlServerEndpoints.java:341)
    at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOnWorker$22(KsqlServerEndpoints.java:327)
    at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:313)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: io.confluent.ksql.util.KsqlException: 'ROWTIME' is a reserved column name. You cannot use it as a name for a column.
    at io.confluent.ksql.ddl.commands.CreateSourceFactory.lambda$buildSchema$1(CreateSourceFactory.java:263)
    at java.base/java.lang.Iterable.forEach(Iterable.java:75)
    at io.confluent.ksql.ddl.commands.CreateSourceFactory.buildSchema(CreateSourceFactory.java:261)
    at io.confluent.ksql.ddl.commands.CreateSourceFactory.createTableCommand(CreateSourceFactory.java:166)
    at io.confluent.ksql.ddl.commands.CommandFactories.handleCreateTable(CommandFactories.java:139)
    at io.confluent.ksql.util.HandlerMaps$BuilderR2.lambda$castHandler2$2(HandlerMaps.java:840)
    at io.confluent.ksql.ddl.commands.CommandFactories.create(CommandFactories.java:110)
    at io.confluent.ksql.engine.EngineContext.createDdlCommand(EngineContext.java:232)
    at io.confluent.ksql.engine.EngineExecutor.plan(EngineExecutor.java:646)
    ... 15 more

Additional context Add any other context about the problem here.

suhas-satish commented 2 years ago

@pri-naik5 , do you know if this was working in earlier versions for you before you hit this issue in 0.26.0 ?

pri-naik5 commented 2 years ago

I have not tried earlier versions with this script.