bryanyang0528 / ksql-python

A python wrapper for the KSQL REST API.
MIT License
159 stars 67 forks source link

Create a KSQLdb table from a stream #100

Open GiuseppePegasus opened 2 years ago

GiuseppePegasus commented 2 years ago

I try to create a table with ksql python client. My query is:

CREATE TABLE AVG5MINTEMP WITH (KAFKA_TOPIC='AVG5MINTEMP', KEY_FORMAT='JSON', PARTITIONS=1, REPLICAS=1) AS SELECT METEO_RAW.TYPE TYPE, METEO_RAW.NOME_UNITA NOME_UNITA, AVG(METEO_RAW.VALORE) AVG_VALUE FROM METEO_RAW METEO_RAW WINDOW TUMBLING ( SIZE 5 MINUTES ) GROUP BY METEO_RAW.TYPE, METEO_RAW.NOME_UNITA where METEO_RAW.NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE = 2 EMIT CHANGES;

where METEO_RAW is a stream from a Kafka topic. In the UI KSQLdb interface it works fine. So I try with the sample command to submit a query:

client.create_stream_as(table_name="AVG5MINTEMP", select_columns=["METEO_RAW.TYPE TYPE", "METEO_RAW.NOME_UNITA NOME_UNITA", "AVG(NOME_UNITA.VALORE) AVG_VALUE"], src_table="METEO_RAW", kafka_topic="AVG5MINTEMP", value_format = 'JSON', conditions="WINDOW TUMBLING ( SIZE 5 MINUTES ) GROUP BY TYPE \ WHERE NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES; ") but I receive the error:

DEBUG:root:content: {'@type': 'statement_error', 'error_code': 40001, 'message': "line 1:199: mismatched input 'WINDOW' expecting {'(', 'EMIT', 'CHANGES', 'FINAL', 'NOT', 'ESCAPE', 'NULL', 'TRUE', 'FALSE', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'CASE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'TYPES', 'CAST', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'KEY', 'SINK', 'SOURCE', 'PRIMARY', 'REPLACE', 'ASSERT', 'ADD', 'ALTER', 'IF', '+', '-', STRING, INTEGER_VALUE, DECIMAL_VALUE, FLOATING_POINT_VALUE, IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER, VARIABLE}", 'statementText': "CREATE stream AVG5MINTEMP WITH (kafka_topic='AVG5MINTEMP', value_format='JSON') AS SELECT METEO_RAW.TYPE TYPE, METEO_RAW.NOME_UNITA NOME_UNITA, AVG(NOME_UNITA.VALORE) AVG_VALUE FROM METEO_RAW where WINDOW TUMBLING ( SIZE 5 MINUTES ) GROUP BY TYPE WHERE NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES;", 'entities': []}

I try to use create_stream_as in this manner:

client.create_stream_as(table_name="AVG5MINTEMP", select_columns=["METEO_RAW.TYPE TYPE", "METEO_RAW.NOME_UNITA NOME_UNITA", "AVG(METEO_RAW.VALORE) AVG_VALUE"], src_table="METEO_RAW METEO RAW", kafka_topic="AVG5MINTEMP", value_format = 'JSON', conditions="WINDOW TUMBLING ( SIZE 5 MINUTES ) METEO_RAW.NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES; ")

but I receive the error:

DEBUG:root:KSQL generated: CREATE stream AVG5MINTEMP WITH (kafka_topic='AVG5MINTEMP', value_format='JSON') AS SELECT METEO_RAW.TYPE TYPE, METEO_RAW.NOME_UNITA NOME_UNITA, AVG(METEO_RAW.VALORE) AVG_VALUE FROM METEO_RAW METEO RAW where WINDOW TUMBLING ( SIZE 5 MINUTES ) METEO_RAW.NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES; DEBUG:root:content: {'@type': 'statement_error', 'error_code': 40001, 'message': "line 1:198: mismatched input 'RAW' expecting {';', 'EMIT', 'WHERE', 'WINDOW', 'GROUP', 'HAVING', 'LIMIT', 'PARTITION'}", 'statementText': "CREATE stream AVG5MINTEMP WITH (kafka_topic='AVG5MINTEMP', value_format='JSON') AS SELECT METEO_RAW.TYPE TYPE, METEO_RAW.NOME_UNITA NOME_UNITA, AVG(METEO_RAW.VALORE) AVG_VALUE FROM METEO_RAW METEO RAW where WINDOW TUMBLING ( SIZE 5 MINUTES ) METEO_RAW.NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES;", 'entities': []}

Please can support in this case how can create a table from a stream? Ths.