confluentinc / kafka-tutorials

Tutorials and Recipes for Apache Kafka
https://developer.confluent.io/tutorials
Apache License 2.0
15 stars 90 forks source link

Suggestions to improve payment-status-check recipe #1050

Open mjsax opened 2 years ago

mjsax commented 2 years ago

In https://confluentinc.github.io/ksqldb-recipes/real-time-analytics/payment-status-check/ I think we don't need to create enriched_payments nor payments_with_status but we can merge those two queries with payments_final?

ybyzek commented 2 years ago

@mjsax I'm not following the improvement suggestion. Can you please clarify? Please note payment_statuses is a combination of 2 streams, so that intermediate step looks necessary.

mjsax commented 2 years ago

Please note payment_statuses is a combination of 2 streams, so that intermediate step looks necessary.

Agree. My comment was about the following:

We first join payments and customers:

CREATE STREAM enriched_payments AS SELECT
  [...]
FROM payments p LEFT JOIN customers c ON p.custid = c.id;

Next we join the result with payment_statuses:

CREATE STREAM payments_with_status AS SELECT
  [...]
FROM enriched_payments ep LEFT JOIN payment_statuses ps WITHIN 1 HOUR ON ep.payment_id = ps.payment_id ;

We can do both joins in one query:

CREATE STREAM payments_with_status AS SELECT
FROM payments p LEFT JOIN customers c ON p.custid = c.id
     LEFT JOIN payment_statuses ps WITHIN 1 HOUR ON p.payment_id = ps.payment_id ;

Next we aggregate the result:

CREATE TABLE payments_final AS SELECT
  payment_id,
  HISTOGRAM(status) AS status_counts,
  COLLECT_LIST('{ "system" : "' + source_system + '", "status" : "' + STATUS + '"}') AS service_status_list
FROM payments_with_status
WHERE status IS NOT NULL
GROUP BY payment_id;

But we can do the aggregation in the join query directly:

CREATE TABLE payments_final AS SELECT
  payment_id,
  HISTOGRAM(status) AS status_counts,
  COLLECT_LIST('{ "system" : "' + source_system + '", "status" : "' + STATUS + '"}') AS service_status_list
FROM payments p LEFT JOIN customers c ON p.custid = c.id
     LEFT JOIN payment_statuses ps WITHIN 1 HOUR ON p.payment_id = ps.payment_id ;
WHERE status IS NOT NULL
GROUP BY payment_id;