confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
122 stars 1.04k forks source link

Avro limitation for commands in RUN SCRIPT vs. interactive KSQL usage #1031

Closed ybyzek closed 5 years ago

ybyzek commented 6 years ago

When you create a new stream by reading from a Kafka topic with Avro-formatted messages, you don't have to define columns or data types in the CREATE statement....if you are interacting via the CLI.

However, if you are putting these commands in a script that you then pass in via queries-file, then I think you do.

These commands passed in via RUN SCRIPT work:

CREATE STREAM ksqlplayevents (SONG_ID BIGINT, DURATION BIGINT) WITH (KAFKA_TOPIC='play-events', VALUE_FORMAT='AVRO');
CREATE STREAM ksqlplayevents_min_duration AS SELECT * FROM ksqlplayevents WHERE DURATION > 30000;

These commands passed in via RUN SCRIPT do not work:

CREATE STREAM ksqlplayevents WITH (KAFKA_TOPIC='play-events', VALUE_FORMAT='AVRO');
CREATE STREAM ksqlplayevents_min_duration AS SELECT * FROM ksqlplayevents WHERE DURATION > 30000;

Error:

io.confluent.ksql.parser.exception.ParseFailedException: Parsing failed on KsqlEngine msg:Field DURATION is ambiguous.
    at io.confluent.ksql.KsqlEngine.parseQueries(KsqlEngine.java:278)
    at io.confluent.ksql.KsqlEngine.buildMultipleQueries(KsqlEngine.java:182)
    at io.confluent.ksql.rest.server.computation.StatementExecutor.handleRunScript(StatementExecutor.java:273)
    at io.confluent.ksql.rest.server.computation.StatementExecutor.executeStatement(StatementExecutor.java:252)
    at io.confluent.ksql.rest.server.computation.StatementExecutor.handleStatementWithTerminatedQueries(StatementExecutor.java:203)
    at io.confluent.ksql.rest.server.computation.StatementExecutor.handleStatement(StatementExecutor.java:108)
    at io.confluent.ksql.rest.server.computation.CommandRunner.executeStatement(CommandRunner.java:105)
    at io.confluent.ksql.rest.server.computation.CommandRunner.fetchAndRunCommands(CommandRunner.java:88)
    at io.confluent.ksql.rest.server.computation.CommandRunner.run(CommandRunner.java:63)
    at java.lang.Thread.run(Thread.java:745)
Caused by: io.confluent.ksql.parser.AstBuilder$InvalidColumnReferenceException: Field DURATION is ambiguous.
    at io.confluent.ksql.parser.AstBuilder.visitColumnReference(AstBuilder.java:1147)
    at io.confluent.ksql.parser.AstBuilder.visitColumnReference(AstBuilder.java:142)
    at io.confluent.ksql.parser.SqlBaseParser$ColumnReferenceContext.accept(SqlBaseParser.java:5500)
    at org.antlr.v4.runtime.tree.AbstractParseTreeVisitor.visitChildren(AbstractParseTreeVisitor.java:46)
    at io.confluent.ksql.parser.SqlBaseBaseVisitor.visitValueExpressionDefault(SqlBaseBaseVisitor.java:531)
    at io.confluent.ksql.parser.SqlBaseParser$ValueExpressionDefaultContext.accept(SqlBaseParser.java:5054)
    at org.antlr.v4.runtime.tree.AbstractParseTreeVisitor.visit(AbstractParseTreeVisitor.java:18)
    at io.confluent.ksql.parser.AstBuilder.visitPredicated(AstBuilder.java:866)
    at io.confluent.ksql.parser.AstBuilder.visitPredicated(AstBuilder.java:142)
    at io.confluent.ksql.parser.SqlBaseParser$PredicatedContext.accept(SqlBaseParser.java:4616)
    at org.antlr.v4.runtime.tree.AbstractParseTreeVisitor.visitChildren(AbstractParseTreeVisitor.java:46)
    at io.confluent.ksql.parser.SqlBaseBaseVisitor.visitBooleanDefault(SqlBaseBaseVisitor.java:461)
    at io.confluent.ksql.parser.SqlBaseParser$BooleanDefaultContext.accept(SqlBaseParser.java:4355)
    at org.antlr.v4.runtime.tree.AbstractParseTreeVisitor.visitChildren(AbstractParseTreeVisitor.java:46)
    at io.confluent.ksql.parser.SqlBaseBaseVisitor.visitExpression(SqlBaseBaseVisitor.java:447)
    at io.confluent.ksql.parser.SqlBaseParser$ExpressionContext.accept(SqlBaseParser.java:4283)
    at org.antlr.v4.runtime.tree.AbstractParseTreeVisitor.visit(AbstractParseTreeVisitor.java:18)
    at io.confluent.ksql.parser.AstBuilder.visitSelectSingle(AstBuilder.java:559)
    at io.confluent.ksql.parser.AstBuilder.visitSelectSingle(AstBuilder.java:142)
    at io.confluent.ksql.parser.SqlBaseParser$SelectSingleContext.accept(SqlBaseParser.java:3406)
    at org.antlr.v4.runtime.tree.AbstractParseTreeVisitor.visit(AbstractParseTreeVisitor.java:18)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
    at io.confluent.ksql.parser.AstBuilder.visit(AstBuilder.java:1358)
    at io.confluent.ksql.parser.AstBuilder.visitQuerySpecification(AstBuilder.java:362)
    at io.confluent.ksql.parser.AstBuilder.visitQuerySpecification(AstBuilder.java:142)
    at io.confluent.ksql.parser.SqlBaseParser$QuerySpecificationContext.accept(SqlBaseParser.java:2416)
    at org.antlr.v4.runtime.tree.AbstractParseTreeVisitor.visitChildren(AbstractParseTreeVisitor.java:46)
    at io.confluent.ksql.parser.SqlBaseBaseVisitor.visitQueryPrimaryDefault(SqlBaseBaseVisitor.java:251)
    at io.confluent.ksql.parser.SqlBaseParser$QueryPrimaryDefaultContext.accept(SqlBaseParser.java:2234)
    at org.antlr.v4.runtime.tree.AbstractParseTreeVisitor.visitChildren(AbstractParseTreeVisitor.java:46)
    at io.confluent.ksql.parser.SqlBaseBaseVisitor.visitQueryTermDefault(SqlBaseBaseVisitor.java:244)
    at io.confluent.ksql.parser.SqlBaseParser$QueryTermDefaultContext.accept(SqlBaseParser.java:2162)
    at org.antlr.v4.runtime.tree.AbstractParseTreeVisitor.visit(AbstractParseTreeVisitor.java:18)
    at io.confluent.ksql.parser.AstBuilder.visitQueryNoWith(AstBuilder.java:312)
    at io.confluent.ksql.parser.AstBuilder.visitQueryNoWith(AstBuilder.java:142)
    at io.confluent.ksql.parser.SqlBaseParser$QueryNoWithContext.accept(SqlBaseParser.java:2088)
    at org.antlr.v4.runtime.tree.AbstractParseTreeVisitor.visit(AbstractParseTreeVisitor.java:18)
    at io.confluent.ksql.parser.AstBuilder.visitQuery(AstBuilder.java:289)
    at io.confluent.ksql.parser.AstBuilder.visitCreateStreamAs(AstBuilder.java:240)
    at io.confluent.ksql.parser.AstBuilder.visitCreateStreamAs(AstBuilder.java:142)
    at io.confluent.ksql.parser.SqlBaseParser$CreateStreamAsContext.accept(SqlBaseParser.java:710)
    at org.antlr.v4.runtime.tree.AbstractParseTreeVisitor.visit(AbstractParseTreeVisitor.java:18)
    at io.confluent.ksql.parser.AstBuilder.visitSingleStatement(AstBuilder.java:166)
    at io.confluent.ksql.parser.AstBuilder.visitSingleStatement(AstBuilder.java:142)
    at io.confluent.ksql.parser.SqlBaseParser$SingleStatementContext.accept(SqlBaseParser.java:293)
    at org.antlr.v4.runtime.tree.AbstractParseTreeVisitor.visit(AbstractParseTreeVisitor.java:18)
    at io.confluent.ksql.parser.KsqlParser.prepareStatement(KsqlParser.java:82)
    at io.confluent.ksql.KsqlEngine.parseQueries(KsqlEngine.java:257)
bluemonk3y commented 6 years ago

it would be good if the error message included how to fix the problem

rmoff commented 6 years ago

@ybyzek do you know if this is still an issue in the latest builds? I can see this is from a few months back.

ybyzek commented 6 years ago

@rmoff yes it is still an issue.

miguno commented 6 years ago

A related issue is https://github.com/confluentinc/ksql/issues/1320 (KSQL in headless mode doesn't do the same validation on input queries as the interactive mode).

Update 2018-11-12: https://github.com/confluentinc/ksql/issues/1320 has been fixed since.

miguno commented 5 years ago

cc @big-andy-coates, this is related to what @apurvam wrote at https://github.com/confluentinc/ksql/issues/1544#issuecomment-437482000:

@big-andy-coates assigning this to you since you are already looking into this problem. It would be good to unify the behavior of multiline statements across modes as well. Right now, I think they are supported in the CLI, but what about in run script and in non-interactive mode?

big-andy-coates commented 5 years ago

With the above PR the CLI itself handles run script commands by loading the file contents and posting to the rest endpoint as a normal (multi-line) request. So the statements are handled like any other statements sent via the CLI.

I've also extended the rest API to support SET and UNSET commands within multi-line requests. The commands only affect properties within the scope of the request itself, i.e. within the scope of the script.

@ybyzek, is you have the time I'd love for you to test this out and make sure you're happy with the functionality and there aren't any edge cases I've not thought of.

Thanks,

Andy

ybyzek commented 5 years ago

@big-andy-coates wonderful! I'm happy to validate, let's sync offline on logistics to validate.