confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
127 stars 1.04k forks source link

KSQL consistency #2225

Open pablocastilla opened 5 years ago

pablocastilla commented 5 years ago

Hi!

I am doing a PoC using dotnet and ksql. https://github.com/pablocastilla/kafkiano/

The overall idea is to see if I can implement business logic using KSQL. In the example I introduce devices in the stock and make orders from it. The example consists in this: Two main streams:

With those streams I create two tables:

After those two tables I create another table with the difference between the orders and the products in the inventory, just to know if there are products left.

With a join in that last table and the order stream I can have the stock left when that order is processed.

I am introducing the events using the productname as key. So far it works well in my machine, but my question is: is this consistent in a big production environment? I would like to know restrictions about when the consistency is broken when a lot of events are received in parallel.

Thanks

KSQL:
//INVENTORY STREAMS
CREATE STREAM InventoryEventsStream (ProductName VARCHAR, Quantity INT) WITH (kafka_topic='INVENTORYEVENTS', key='ProductName', value_format='json');

//TABLE GROUPING BY PRODUCT
CREATE TABLE  ProductsStock as select ProductName,sum(Quantity) as Stock from InventoryEventsStream group by ProductName;

// ORDERS STREAMS
CREATE STREAM OrdersCreatedStream (ProductName VARCHAR,Quantity INT, OrderId VARCHAR, User VARCHAR) WITH (kafka_topic='ORDERSEVENTS', key='ProductName', value_format='json');
//TABLE GROUPING BY PRODUCT
CREATE TABLE ProductsOrdered as select ProductName as ProductName,sum(Quantity) as Orders from  ORDERSCREATEDSTREAM group by ProductName;

// join with the difference
CREATE TABLE StockByProductTable  AS  SELECT ps.ProductName as ProductName,ps.Stock - op.Orders as Stock FROM PRODUCTSORDERED op JOIN ProductsStock ps ON op.ProductName = ps.ProductName;

//logic: I want the stock left when I make an order
SELECT ocs.OrderId,ocs.User,sbpt.Stock FROM OrdersCreatedStream ocs JOIN  StockByProductTable sbpt ON sbpt.ProductName = ocs.ProductName;
kjayashr commented 5 years ago

@pablocastilla I am replicating the issue in a clustered setup and testing it. May I know your sample load and your production config ? How many nodes?

pablocastilla commented 5 years ago

Hi! Thanks for answering.

It is not a real issue I have seen, it is just doubts I have regarding KSQL.

I understand that if you produce events using a key they will be processed sequentially (one by one in the whole cluster). But I have a doubt:

I have a few queries for each event: when one event triggers the ksql queries it processes every query before the next event enters in the "ksql query chain"? Or it just happens in the first query and one can overtake another in the second query?

In which order are the KSQLs queries executed? In the order that they are created in the system? It would be different if you count before doing the latest join or after it.

My problem is that I don't have a clear understanding of the guarantees provided by the system and how to use it properly.

Thanks!

kjayashr commented 5 years ago

I get your question. A minimal answer for your question is, it executes as soon as your message is available in the stream.

A good analogy would be a machinery which is always running. Whenever a payload enters inside, it will just process it. Now it comes to you on the chaining part. Are you inserting some payload to a new record stream after processing? Then yes, you can call it 'chaining'. Once you run / execute CTAS/ CSAS statements you see something like 'Table/Stream created and Running' , that is exactly what it means.

You have ignited an always running query!

pablocastilla commented 5 years ago

Ok! So I have a lot of independent queries running in parallel. I understand if I want to do multiple steps it's enough to create tables or streams, that's writing a new record stream.

In my example, the order stream would trigger a write in the ProductsOrdered, that write would trigger a write in the StockByProductTable and that would trigger the select which doesn't write anywhere right now (will do in the future).

Am I right? is my ksql consistent in keeping the inventory consistently?

kjayashr commented 5 years ago

Exactly ! Still, instead of saying a query triggers an other, I would rather say Stream B starts processing a message when it is available. Other than that, Your approach seems correct. KSQL Cookbook by @rmoff is a good place to start/validate.

pablocastilla commented 5 years ago

Perfect answered. Thanks so much!

I will finish the PoC using this information. I like the way we can do the inventory kind of logic with streaming, it is very elegant.

miguno commented 5 years ago

Let us know how it turns out, @pablocastilla !

pablocastilla commented 5 years ago

Hi!

Well, I have switched my solution moving from KSQL to C# plain code, but I think you can help me :). What I am currently trying to do is something like the logic in your inventory service in the microservice example: https://github.com/confluentinc/kafka-streams-examples/blob/3.3.1-post/src/main/java/io/confluent/examples/streams/microservices/InventoryService.java The service keeps the stock of a list of products

I think with KSQL is very difficult to write code with logic and achieve consistency, the queries all run in parallel and I can not write complex logic on them. So I will leave the KSQL for pure streaming for now (something like orders per minute). I need ifs!

My current idea here is to have the inventory in memory with a dictionary with the product and the product stock (Dictionary<string,int>). As the coming events are keyed by product they will be executed one by one so it will be consistent in adding or removing from the stock. When I init the service, detect an error or a partition is changed I reload the dictionary from a compacted topic where I write every change it is made in the stock (let's see those are events for the read model). Although the current .net driver doesn't support transactions let's asume one day they will and all the publications from the services are in a transaction.

What do you think? Am I going in the right direction? Is this the normal way to do it? This way of working remembers me the actor model, where one actor message is only executed in one machine and the actor data is in memory.

Thanks so much! You will have a good .net example, I promise!

miguno commented 5 years ago

I think with KSQL is very difficult to write code with logic and achieve consistency, the queries all run in parallel and I can not write complex logic on them. [...] I need ifs!

Can you elaborate a bit here?

pablocastilla commented 5 years ago

Of course!

The issue is I am writing code with the inventory service logic. That service has three actions:

If I use KSQL to validate the stock when an order is processed I need to implement a few queries (like the ones you see in my first question in this thread): the streams, a table for doing a stock by product, a table for having the orders, a join of the orders stream with the difference between the inventory and the orders...

I understood from the answers that those queries are executed in parallel, so I am not sure about the consistency when I add new stock and have new orders. For example, is the join between the orders stream and the StockByProductTable executed after the later is updated? But also each query (except the streams from a topic) implements an execution of a message which makes my logic slower.

On the other hand if I process all that logic in plain c# code I just have to process the messages and keep the stock in memory, that's easy because if I produce them by key they will be consumed one by one by product, am I right? is this a normal way to do it?

Also I see that implementing logic in KSQL can put me in a situation in which I will have a lot of queries for doing my logic, that remembers me the times when the logic was done in store procedures. Imagine a much more complex logic than the stock one, it would be difficult to understand. I think the business logic is more readable and maintenable in c# code, leaving KSQL for quering the streams a give a good data structure to the commands processing and also for generating read models.

What do you think?