This PR reworks several details of Arroyo's updating SQL support to improve the UX and set up for supporting more complex updating queries.
Internally, Arroyo represents updates as a flat event with a "retract" field that may be set to true or false. However, Debezium formatted data (whether being consumed or produced) has a nested structure with a "before", "after", and "op" field. When we consume debezium, we need to first "unroll" it into our flattened format, so each update becomes a sequence of deletes and creates.
Similarly, when an updating operator (like an updating aggregate) produce an update for a row, they cannot produce an update (because we don't have a native way to represent that) but only a sequence of retract and create rows.
Currently, a query like
SELECT count(*) from table;
will produce a Debezium result stream like this
reflecting the underlying representation.
However, this is undeseriable—we've turned atomic updates into non-atomic delete/create pairs, leading to the potential for data loss in the consuming system if a delete gets consumed but not the corresponding create. It's also double the events and extra cognitive load for what is truly an update.
With this PR, we now merge the deletes and creates into updates, so we get this:
There is also a small breaking change; in order to support more efficient updating operations off of Debezium sources, we now require that sources be annotated with at least one PRIMARY KEY field:
CREATE TABLE debezium_source (
id INT PRIMARY KEY,
customer_id INT,
price FLOAT,
order_date TIMESTAMP,
status TEXT
) WITH (
connector = 'kafka',
format = 'debezium_json',
type = 'source',
...
);
This must reflect the primary keys in the underlying tables.
This PR reworks several details of Arroyo's updating SQL support to improve the UX and set up for supporting more complex updating queries.
Internally, Arroyo represents updates as a flat event with a "retract" field that may be set to true or false. However, Debezium formatted data (whether being consumed or produced) has a nested structure with a "before", "after", and "op" field. When we consume debezium, we need to first "unroll" it into our flattened format, so each update becomes a sequence of deletes and creates.
Similarly, when an updating operator (like an updating aggregate) produce an update for a row, they cannot produce an update (because we don't have a native way to represent that) but only a sequence of retract and create rows.
Currently, a query like
will produce a Debezium result stream like this
reflecting the underlying representation.
However, this is undeseriable—we've turned atomic updates into non-atomic delete/create pairs, leading to the potential for data loss in the consuming system if a delete gets consumed but not the corresponding create. It's also double the events and extra cognitive load for what is truly an update.
With this PR, we now merge the deletes and creates into updates, so we get this:
There is also a small breaking change; in order to support more efficient updating operations off of Debezium sources, we now require that sources be annotated with at least one PRIMARY KEY field:
This must reflect the primary keys in the underlying tables.