timeplus-io / proton

A stream processing engine and database, and a fast and lightweight alternative to ksqlDB and Apache Flink, 🚀 powered by ClickHouse
https://timeplus.com
Apache License 2.0
1.59k stars 70 forks source link

Join Kafka stream with slowly changing dimension table in SQL Server #588

Open kali786516 opened 9 months ago

kali786516 commented 9 months ago

Use case Imagine a Kafka stream of transaction data streaming in which need to join with a data changing dimension table in SQL Server or a file or some JDBC OLTP table, left outer join the dimension table does have new inserts ,updates and deletes every 10 minutes can Proton support this use case ?

Describe the solution you'd like insert into target_stream_table select * from transaction_stream as a left outer join dim_address as b on a.id = b.id ;

transaction_stream :- is a a real time stream data dim_address :- changes data every 5 minutes but not a stream , in Flink or Spark usually we convert this data to a Map (Update State By Key)

jovezhong commented 9 months ago

Thank you @kali786516 for sharing the use case and opening the ticket. Proton currently doesn't support querying external table with JDBC, or get the update every 5-10 minutes. While we design the solution for you, one workaround will be loading the external table into Proton with CDC/Debezium.

kali786516 commented 9 months ago

cdc will also do , this problem is not a firm consider this task as an typical example use case , bare in mind the dimension table should always have full data so the use case is connect stream with dimension batch table.

1 stream record --> joining (left outer join) with 5TB dimension table , this 5TB dimension table can get new inserts and updates every 5 minutes or often.

So the CDC should insert or update the dimension static table if there is a new change, in spark I can do foreach batch and then inside the foreach batch I can read and join the dimension table for every batch or use update state by key and update the map with new records from cdc.

chenziliang commented 8 months ago

Thanks @kali786516 for raising up this interesting question.

In Proton, you can join a stream against a live versioned-kv stream which is lively updated and every time there is update to the kv_stream, Proton will picked it up automatically and lively.

For example

CREATE STREAM left_stream (i int, k string);
CREATE STREAM kv_stream(d string, k string) PRIMARY KEY k SETTINGS mode='versioned_kv';

SELECT * FROM left_stream LEFT OUTER JOIN kv_stream ON left_stream.k = kv_stream.k; 

Would like to understand a bit more details regarding your requirements in the meanwhile.

  1. Do you prefer keeping the 5TB dimension table in the SQL server without moving data out ?
  2. Do you prefer every time a join happened, we do a remote lookup of the remote SQL server lively ?
  3. In Spark, did you talk to remote SQL server lively for each join or buffer the dimension data in Spark cluster which is refreshed periodically if there is no CDC ?
  4. Is there join latency requirement ?
  5. What's the eps (event per second) for the left stream usually ?
kali786516 commented 8 months ago

Nice to hear from you, so the kv stream is a table which has all the data correct whenever a new stream tries to join.

In our case kv stream has 5 TB of history data + new CDC changes ?

Answers to your questions:-

anwhile.

  1. Do you prefer keeping the 5TB dimension table in the SQL server without moving data out ? Can move out do 1 time full load into proton and then from later on use some CDC to get only changes into proton
  2. Do you prefer every time a join happened, we do a remote lookup of the remote SQL server lively ? No need remote lookup
  3. In Spark, did you talk to remote SQL server lively for each join or buffer the dimension data in Spark cluster which is refreshed periodically if there is no CDC ? So it's CDC based the data gets written to s3 if there is a change in SQL server
  4. Is there join latency requirement ? 5 minutes to process a record from start to finish
  5. What's the eps (event per second) for the left stream usually ? 100 events per second for example

— On Thu, Mar 7, 2024, 10:32 PM Ken Chen @.***> wrote:

Thanks @kali786516 https://github.com/kali786516 for raising up this interesting question.

In Proton, you can join a stream against a live versioned-kv stream which is lively updated and every time there is update to the kv_stream, Proton will picked it up automatically and lively.

For example

CREATE STREAM left_stream (i int, k string); CREATE STREAM kv_stream(d string, k string) PRIMARY KEY k SETTINGS mode='versioned_kv';

SELECT * FROM left_stream LEFT OUTER JOIN kv_stream ON left_stream.k = kv_stream.k;

Would like to understand a bit more details regarding your requirements in the meanwhile.

  1. Do you prefer keeping the 5TB dimension table in the SQL server without moving data out ?
  2. Do you prefer every time a join happened, we do a remote lookup of the remote SQL server lively ?
  3. In Spark, did you talk to remote SQL server lively for each join or buffer the dimension data in Spark cluster which is refreshed periodically if there is no CDC ?
  4. Is there join latency requirement ?
  5. What's the eps (event per second) for the left stream usually ?

— Reply to this email directly, view it on GitHub https://github.com/timeplus-io/proton/issues/588#issuecomment-1984012806, or unsubscribe https://github.com/notifications/unsubscribe-auth/AC3JZVOHC6KIDDR2DNLTN73YXCMTDAVCNFSM6AAAAABEIL6HUOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTSOBUGAYTEOBQGY . You are receiving this because you were mentioned.Message ID: @.***>

chenziliang commented 8 months ago

@kali786516, Awesome. Thanks for your detailed reply. Yes, for kv stream, it is designed to handle the baseline and apply the CDC upsert changes.

May I ask how many unique keys for the 5TB data ? And what's is your future growth for this dimension table and left stream, like can it grow to 10TB, 1000 eps for the left stream in the next 2 ~ 3 years ? After join, where do you like to persist the join results and what would you like to do with the joined results ?

If you could share the following details, that will be great since I would really like to test the emulation out and come back to you.

  1. The schema of the stream and the schema of the dimension table (don't need to share the exact column names if that is sensitive, column types and number of columns etc matter)
  2. The typical join you like to execute (is the join always on the primary key?)
  3. SQL Server CDC format / example to update the baseline if you have

Thanks again !

chenziliang commented 8 months ago

Hello @kali786516 , may I ask if you got a chance to read the follow up questions which will be very helpful to us for internal testing ? Thanks a lot

kali786516 commented 8 months ago

This use cases is of 2016 Citi UK pretty old use case, unique would be 4TB can grow up to 1TB in next 5 years.

On Wed, Mar 27, 2024, 3:42 AM Ken Chen @.***> wrote:

Hello @kali786516 https://github.com/kali786516 , may I ask if you got a chance to read the follow up questions which will be very helpful to us for internal testing ? Thanks a lot

— Reply to this email directly, view it on GitHub https://github.com/timeplus-io/proton/issues/588#issuecomment-2021559210, or unsubscribe https://github.com/notifications/unsubscribe-auth/AC3JZVJYVPDVQFUZTKOL5MDY2HXGRAVCNFSM6AAAAABEIL6HUOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDAMRRGU2TSMRRGA . You are receiving this because you were mentioned.Message ID: @.***>

kali786516 commented 8 months ago

Schema I don't remember in Citi we used to call it as reference data where we use our trade transaction data traded,trade date,version and lookup column to lookup reference record

On Wed, Mar 27, 2024, 7:05 AM sri hari kali charan Tummala < @.***> wrote:

This use cases is of 2016 Citi UK pretty old use case, unique would be 4TB can grow up to 1TB in next 5 years.

On Wed, Mar 27, 2024, 3:42 AM Ken Chen @.***> wrote:

Hello @kali786516 https://github.com/kali786516 , may I ask if you got a chance to read the follow up questions which will be very helpful to us for internal testing ? Thanks a lot

— Reply to this email directly, view it on GitHub https://github.com/timeplus-io/proton/issues/588#issuecomment-2021559210, or unsubscribe https://github.com/notifications/unsubscribe-auth/AC3JZVJYVPDVQFUZTKOL5MDY2HXGRAVCNFSM6AAAAABEIL6HUOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDAMRRGU2TSMRRGA . You are receiving this because you were mentioned.Message ID: @.***>

chenziliang commented 6 days ago

Hi @kali786516 , recently we have done some significant improvements in Timeplus Enteprise which solves

  1. Mutable data streaming processing aggregation and join
  2. Large scale lots of concurrent mutable streams join and large scale keys (~200millions) mutable stream aggregation since we have implemented hybrid on disk aggregation and join

Would like to chat more if your use cases are still there and understand how Timeplus can help here.

Thanks !