tomasfabian / ksqlDB.RestApi.Client-DotNet

ksqlDb.RestApi.Client is a C# LINQ-enabled client API for issuing and consuming ksqlDB push and pull queries and executing statements.
MIT License
93 stars 24 forks source link

Create Pull query with GroupBy clause #13

Closed vijaymandave closed 2 years ago

vijaymandave commented 2 years ago

Hi,

I am using nuget Kafka.DotNet.ksqlDB version 2.0.0.

Kafka table

CREATE TABLE kafka_table (rowkey varchar PRIMARY KEY, id INT, eventtime BIGINT, price ARRAY<double>, quantity ARRAY<double>) WITH (KAFKA_TOPIC='kafka_topic_order', FORMAT='PROTOBUF', PARTITIONS=1, REPLICAS=1);
CREATE TABLE QUERYABLE_kafka_table AS SELECT * FROM kafka_table;

How to create pull query to fetch data with group by clause in C#?

Please suggest.

tomasfabian commented 2 years ago

AFAIK ksqldb pull queries don't support GROUP BY clauses.

vijaymandave commented 2 years ago

Hi @tomasfabian, Thank you. Is there any other options instead of pull queries to achieve the same?

tomasfabian commented 2 years ago

hi @vijaymandave, I would suggest you to use a more classical database like Sql server or a document database (mongo) for fetching aggregated data. You will also gain rich indexing capabilities etc. You can use Kafka Connect to replicate the mongo's oplog into a kafka topic with a source connector and then create push queries from this source with ksqldb. Or you can use a sink connector and push your data from a Kafka topic into a database document/table. This will definitely buy you options, but it will also introduce you new challenges.

Here is a simple example how you can take advantage of the stream-table duality. In case that you can use Sql Server, I also implemented a package named SqlServer.Connector. It should help you to connect Sql server with Kafka.

Regards Tomas.

vijaymandave commented 2 years ago

Thank you @tomasfabian.

tomasfabian commented 2 years ago

Not at all. I'm not sure how would you like to group your data, but you should also try this approach:

CREATE TABLE QUERYABLE_kafka_table AS SELECT Count(), * FROM kafka_table GROUP BY Id;

SELECT * FROM QUERYABLE_kafka_table;

Regards Tomas