cockroachdb / cockroach

CockroachDB — the cloud native, distributed SQL database designed for high availability, effortless scale, and control over data placement.
https://www.cockroachlabs.com
Other
29.87k stars 3.77k forks source link

Provide changefeed operation type along with changefeed data #129661

Open daniel-crlabs opened 2 weeks ago

daniel-crlabs commented 2 weeks ago

Is your feature request related to a problem? Please describe.

Customer would like changefeeds to show the kind of operation used to create the data, i.e insert, update, etc …

Describe the solution you'd like

When someone inserts a row, currently they see the following output in Kafka:

{"after": {"id": 4, "id1": 4}, "key": [4]}

Instead, the customer would like the following:

{"after": {"id": 4, "id1": 4}, "key": [4]}  OP: I
{"after": {"id": 4, "id1": 4}, "key": [4]}  OP: U
{"after": {"id": 4, "id1": 4}, "key": [4]}  OP: D

Describe alternatives you've considered

No alternative exists as currently only the following is available (sample output as per CockroachDB Documentation):

{"after": {"city": "seattle", "end_address": null, "end_time": null, "id": "f6c02fe0-a4e0-476d-a3b7-91934d15dce2", "revenue": 25.00, "rider_id": "14067022-6e9b-427b-bd74-5ef48e93da1f", "start_address": "2 Michael Field", "start_time": "2023-06-02T15:14:20.790155", "vehicle_city": "seattle", "vehicle_id": "55555555-5555-4400-8000-000000000005"}, "before": {"city": "seattle", "end_address": null, "end_time": null, "id": "f6c02fe0-a4e0-476d-a3b7-91934d15dce2", "revenue": 25.00, "rider_id": "14067022-6e9b-427b-bd74-5ef48e93da1f", "start_address": "5 Michael Field", "start_time": "2023-06-02T15:14:20.790155", "vehicle_city": "seattle", "vehicle_id": "55555555-5555-4400-8000-000000000005"}, "key": ["seattle", "f6c02fe0-a4e0-476d-a3b7-91934d15dce2"]}

Additional context

User would like the output to be more readable and easier to identify what operation type was performed to modify the data.

Jira issue: CRDB-41650

blathers-crl[bot] commented 2 weeks ago

cc @cockroachdb/cdc

andyyang890 commented 2 weeks ago

Would the event_op() function already supported by CDC queries work for their use case? https://www.cockroachlabs.com/docs/stable/cdc-queries#cdc-query-function-support

daniel-crlabs commented 2 weeks ago

Thank you for the link. I've looked at our docs and there is no example of what the output would look like. Could you provide an example of what the output would look like, this way we would know if this is enough or/and if the customer is ok with it? The customer expects something similar to this:

{"after": {"id": 4, "id1": 4}, "key": [4]}  OP: I
{"after": {"id": 4, "id1": 4}, "key": [4]}  OP: U
{"after": {"id": 4, "id1": 4}, "key": [4]}  OP: D
andyyang890 commented 2 weeks ago

I just tried it out and here's an example of the output:

root@127.0.0.1:26257/demoapp/movr> create table t (x int);                                                                  
CREATE TABLE

Time: 5ms total (execution 5ms / network 0ms)

root@127.0.0.1:26257/demoapp/movr> insert into t values (1), (2), (3);                                                      
INSERT 0 3

Time: 9ms total (execution 8ms / network 0ms)

root@127.0.0.1:26257/demoapp/movr> create changefeed with diff as select *, event_op() from t;                              
{"key":"[998216125680254977]","table":"t","value":"{\"event_op\": \"insert\", \"x\": 1}"}
{"key":"[998216125680320513]","table":"t","value":"{\"event_op\": \"insert\", \"x\": 2}"}
{"key":"[998216125680353281]","table":"t","value":"{\"event_op\": \"insert\", \"x\": 3}"}
daniel-crlabs commented 2 weeks ago

Thank you for the question, unfortunately this is not what the customer is asking and the operation type isn't shown in kafka. Here is a working example using the same workflow as the customer, we do notice there is no output in the kafka feed showing rather the operation was an insert or update:

  1. Created changefeed
CREATE CHANGEFEED FOR TABLE world INTO 'kafka://192.168.88.101:30092' WITH OPTIONS (diff, envelope = 'wrapped')
  1. Show its status
root@lab-k8s:30777/cdc_demo> select * from [show changefeed jobs] where job_id = 998443989744910339;
        job_id       |                                                   description                                                   | user_name | status  |              running_status              |          created           |          started           | finished |          modified          |      high_water_timestamp      | error |           sink_uri           |    full_table_names     | topics | format
---------------------+-----------------------------------------------------------------------------------------------------------------+-----------+---------+------------------------------------------+----------------------------+----------------------------+----------+----------------------------+--------------------------------+-------+------------------------------+-------------------------+--------+---------
  998443989744910339 | CREATE CHANGEFEED FOR TABLE world INTO 'kafka://192.168.88.101:30092' WITH OPTIONS (diff, envelope = 'wrapped') | root      | running | running: resolved=1724775637.506198332,0 | 2024-08-27 15:08:44.604789 | 2024-08-27 15:08:44.697771 | NULL     | 2024-08-27 16:21:11.064008 | 1724775667649944153.0000000000 |       | kafka://192.168.88.101:30092 | {cdc_demo.public.world} | world  | json
(1 row)

Time: 2.103s total (execution 2.072s / network 0.031s)
  1. Added some records
root@lab-k8s:30777/cdc_demo> insert into world values (1,'earth'),(2,'mars'),(3,'mercury');
INSERT 0 3

Time: 15ms total (execution 14ms / network 1ms)
  1. Added another and updated it
root@lab-k8s:30777/cdc_demo> insert into world values (4,'venus');
INSERT 0 1

Time: 27ms total (execution 21ms / network 6ms)
root@lab-k8s:30777/cdc_demo> update world set name = 'jupiter' where id = 4;
UPDATE 1

Time: 18ms total (execution 6ms / network 12ms)
  1. Running kafka consumer shows all operations above, however, there is no indication as to rather the op was an insert or update, as requested by the customer in this feature request:
[appuser@kafka-0 ~]$ kafka-console-consumer --topic world --from-beginning --bootstrap-server kafka-0.kafka:9092
{"after": {"id": 1, "name": "earth"}, "before": null}
{"after": {"id": 2, "name": "mars"}, "before": null}
{"after": {"id": 3, "name": "mercury"}, "before": null}
{"after": {"id": 4, "name": "venus"}, "before": null}
{"after": {"id": 4, "name": "jupiter"}, "before": {"id": 4, "name": "venus"}}
  1. The customer's request and expectation is to have an extra column providing the type of operation that created/updated the records. Using our example above, this is what the customer would like to see:
{"after": {"id": 1, "name": "earth"}, "before": null}, "OP": "I"
{"after": {"id": 2, "name": "mars"}, "before": null}, "OP": "I"
{"after": {"id": 3, "name": "mercury"}, "before": null}, "OP": "I"
{"after": {"id": 4, "name": "venus"}, "before": null}, "OP": "I"
{"after": {"id": 4, "name": "jupiter"}, "before": {"id": 4, "name": "venus"}}, "OP": "U"
rharding6373 commented 2 weeks ago

Hi @daniel-crlabs, as @andyyang890 said, CDC only supports emitting event types for CDC queries. They would have to modify the create changefeed statement to something like CREATE CHANGEFEED world INTO 'kafka://192.168.88.101:30092' WITH OPTIONS (diff, envelope = 'wrapped') AS SELECT *, event_op() FROM world.

We'll let @rohan-joshi evaluate this feature request for a non-CDC query solution against other requests that we have in the TReq process.

daniel-crlabs commented 2 weeks ago

Thank you for the clarification, I'll let them know this is an alternative and they'll monitor the feature request. The create command however needs to be modified otherwise it gives an error, the correct syntax is shown below and examples of what the output looks like:

root@lab-k8s:30777/cdc_demo> CREATE CHANGEFEED INTO 'kafka://192.168.88.101:30092' WITH diff, envelope = 'wrapped' AS SELECT *, event_op() FROM world;
        job_id
----------------------
  998477303124099075
(1 row)

NOTICE: changefeed will emit to topic world
Time: 112ms total (execution 111ms / network 1ms)

Notice how the syntax can be confusing, since the output below is different, most specifically, this create statement will fail because this syntac is wrong ---> WITH OPTIONS (diff, envelope = 'wrapped') and it should be as shown above instead:

root@lab-k8s:30777/cdc_demo> select * from [show changefeed jobs] where job_id = 998477303124099075;
        job_id       |                                                            description                                                             | user_name | status  |              running_status              |          created           |          started           | finished |          modified          |      high_water_timestamp      | error |           sink_uri           |    full_table_names     | topics | format
---------------------+------------------------------------------------------------------------------------------------------------------------------------+-----------+---------+------------------------------------------+----------------------------+----------------------------+----------+----------------------------+--------------------------------+-------+------------------------------+-------------------------+--------+---------
  998477303124099075 | CREATE CHANGEFEED INTO 'kafka://192.168.88.101:30092' WITH OPTIONS (diff, envelope = 'wrapped') AS SELECT *, event_op() FROM world | root      | running | running: resolved=1724782777.825150656,0 | 2024-08-27 17:58:11.041312 | 2024-08-27 17:58:11.162161 | NULL     | 2024-08-27 18:20:11.640054 | 1724782808526414463.0000000000 |       | kafka://192.168.88.101:30092 | {cdc_demo.public.world} | world  | json
(1 row)

Time: 299ms total (execution 289ms / network 10ms)

Regardless, we can now see how this works. Here are some changes:

root@lab-k8s:30777/cdc_demo> update world set name = 'saturn' where id = 4;
UPDATE 1

Time: 13ms total (execution 11ms / network 1ms)

root@lab-k8s:30777/cdc_demo> delete from world where id = 3;
DELETE 1

Time: 7ms total (execution 5ms / network 2ms)

and the output in Kafka:

[appuser@kafka-0 ~]$ kafka-console-consumer --topic world --from-beginning --bootstrap-server kafka-0.kafka:9092
{"after": {"event_op": "insert", "id": 1, "name": "earth"}, "before": null}
{"after": {"event_op": "insert", "id": 2, "name": "mars"}, "before": null}
{"after": {"event_op": "insert", "id": 3, "name": "mercury"}, "before": null}
{"after": {"event_op": "insert", "id": 4, "name": "jupiter"}, "before": null}
{"after": {"event_op": "update", "id": 4, "name": "saturn"}, "before": {"id": 4, "name": "jupiter"}}
{"after": null, "before": {"id": 3, "name": "mercury"}}

The sad part is, only update and insert events have an "event_op", deletes do not. I'll let the customer know this is an alternative until their feature request has been approved and worked on.

rytaft commented 3 days ago

Applying P-3 label as this is an enhancement request and we will determine priority of this feature during planning.