Closed bobvawter closed 3 weeks ago
Hi Bob, just want to make sure I understand the ask here clearly. I did a quick search on the changefeed queries docs and found this (link):
The snippet above seems to imply that if we are using a webhook sink, this is already passed by default. Is the ask here then to unmarshal the primary key array
into an appropriate format (maybe *ident.Map[int]
) and use that in lieu of the current getPrimaryKey
? From an initial look at the codebase it looks like I need to update the query_payload
processors to pull this data out of the data coming from the source CDC message when key_in_value
is specified.
Also, what would be the easiest way to dump the incoming raw JSON that's coming in from the changefeed message?
Made some progress here, I'm at least able to dump out the received JSON coming out of the request body from the webhook. I see that the data now shows the key in the message:
Received JSON: map[length:1 payload:[map[after:map[id:1.204560795e+09 t:yo] before:<nil> key:[1.204560795e+09] updated:1728948380857547000.0000000000]]]
This is from an INSERT
into tbl1
, which has a simple schema:
root@127.0.0.1:26257/molt> SHOW CREATE TABLE tbl1;
table_name | create_statement
-------------+------------------------------------------------
tbl1 | CREATE TABLE public.tbl1 (
| id INT8 NOT NULL,
| t STRING NULL,
| CONSTRAINT tbl1_pkey PRIMARY KEY (id ASC)
| )
(1 row)
Problem
One thing to note here is that the key
seems to be sent with the value of the key, not the name of the column. Given that getPrimaryKeys
adds to a map that has a key of ident type and value of integer, it doesn't seem like the webhook payload gives us what we need. We'll still need to introspect the table to get the column names.
I've tried this with both 23.2 and 24.1, and also with composite Pks:
Received JSON: map[length:1 payload:[map[after:map[id:3 id2:302 t:<nil>] before:<nil> key:[3 302] updated:1728950170607052977.0000000000]]]
Assuming we get the correct data from the webhook (can check to see if I can set another setting to column names)
Given that we modify the values of req.keys
via reference, I'm thinking we can embed the logic here to key off of the "key" field and get that data into the ident.Map
. Alternatively, we can do this after getPrimaryKey
is called in the webhook handler.
The documentation is not correct when it comes to changefeed queries, it seems to be necessary to request key_in_value
. The columns don't matter, just a unique replication identity for the mutation. That's currently provided by getPrimaryKey()
, but it doesn't need to be in this case.
Yep, you're right, let me bring this docs inconsistency to the CDC team's attention. I tested locally and needed to pass in key_in_value
explicitly.
Ok got it, if you need a unique replication identity for the mutation, then we get that from the key
key in the map. But I'm curious how getPrimaryKey
does that right now? Since this seems to get the column name mapped to its index right now instead of the actual PK value.
Where does this get used downstream and who needs this unique ID actually?
The output map from getPrimaryKey()
winds up being used here when the payload line is decoded: https://github.com/cockroachdb/replicator/blob/eb575e31b247da8ba15e7bb2a5a1173f02c2e1f2/internal/util/cdcjson/query_payload.go#L129-L138
The types.Mutation.Key
field is used when it's necessary to identify if two mutations should be considered to affect the same record, be it a tabular row, document id, etc.
Ok yeah, I get what you're saying now. The PK columns are a means to the end of getting the unique ID for a mutation. But we skip all that because the source changefeed gives us that data now.
So we just need to use the provided keys
values for the ID if it's given by the changefeed (when key_in_value
is specified). Otherwise, we can do this back processing in the default case if we don't have the keys array from the payload.
As presently implemented, we perform a bit of work when parsing changefeed query payloads to create a replication key based on columns in the target table (https://github.com/cockroachdb/replicator/blob/master/internal/source/cdc/primary_key.go#L26). Due to https://github.com/cockroachdb/cockroach/issues/131856 we may not have the information necessary to do so when processing deletes, when the target primary key requires a projected value.
If changefeed queries can support the
key_in_value
option, this back-parsing would be unnecessary. The source changefeed would simply provide a replica identity (the PK of the row that triggered the message). Given the changes to supportREPLICA IDENTITY FULL
, the replica key does not require any relationship to the target PK.