cockroachdb / replicator

replicator is a toolkit for ingesting logical replication feeds into a CockroachDB cluster
Apache License 2.0
60 stars 24 forks source link

Support for v23.1 CDC Queries #307

Closed thailowki closed 1 year ago

thailowki commented 1 year ago

I noticed CDC queries produce different messages in 23.1 vs ones in 22.2.x

for example:

22.2.x

  {"key":"[\"user1\"]",
  "table":"usertable",
  "value":"{\"after\": 
    {
        \"field0\": \"1\", 
        \"field1\": \"2\", 
        \"field2\": \"3\", 
        \"field3\": \"4\", 
        \"field4\": \"5\", 
        \"field5\": \"6\", 
        \"field6\": \"7\", 
        \"field7\": \"8\", 
        \"field8\": \"9\",
        \"field9\": \"10\", 
        \"ycsb_key\": \"user1\"
        }
    }"
}

vs 23.1

{"key":"[\"user4\"]",
  "table":"usertable",
  "value":
    "{\"__crdb__\": 
        {\"updated\": \"1682299912954058000.0000000000\"}, 
    \"cdc_prev\": null, 
    \"field0\": \"1\", 
    \"field1\": \"2\", 
    \"field2\": \"3\", 
    \"field3\": \"4\", 
    \"field4\": \"5\", 
    \"field5\": \"6\", 
    \"field6\": \"7\", 
    \"field7\": \"8\", 
    \"field8\": \"9\", 
    \"field9\": \"10\", 
    \"updated_at\": 
    \"2023-04-24T01:31:50.478314\", 
    \"ycsb_key\": \"user4\"}"}

would cdc-sink get updated to support 23.1 soon?

Thanks!

bobvawter commented 1 year ago

v23.1 is part of the test matrix.

Is there something in particular that's not working?

thailowki commented 1 year ago

the webhook does not work:

ERROR  [Apr 25 10:50:28]                                               detail="json: unknown field \"__crdb__\"\ncould not decode payload\ngithub.com/cockroachdb/cdc-sink/internal/source/cdc.(*Handler).webhook\n\t/Users/thaipham/Projects/cdc-sink/internal/source/cdc/webhook.go:80\ngithub.com/cockroachdb/cdc-sink/internal/source/cdc.(*Handler).ServeHTTP\n\t/Users/thaipham/Projects/cdc-sink/internal/source/cdc/handler.go:118\ngithub.com/cockroachdb/cdc-sink/internal/source/server.logWrapper.func1\n\t/Users/thaipham/Projects/cdc-sink/internal/source/server/wrapper.go:99\nnet/http.HandlerFunc.ServeHTTP\n\t/usr/local/go/src/net/http/server.go:2109\nnet/http.(*ServeMux).ServeHTTP\n\t/usr/local/go/src/net/http/server.go:2487\ngolang.org/x/net/http2/h2c.h2cHandler.ServeHTTP\n\t/Users/thaipham/go/pkg/mod/golang.org/x/net@v0.9.0/http2/h2c/h2c.go:125\nnet/http.serverHandler.ServeHTTP\n\t/usr/local/go/src/net/http/server.go:2947\nnet/http.(*conn).serve\n\t/usr/local/go/src/net/http/server.go:1991\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_arm64.s:1165" error="could not decode payload: json: unknown field \"__crdb__\"" uri=/ycsb/public
ERROR  [Apr 25 10:50:28]                                               detail="json: unknown field \"__crdb__\"\ncould not decode payload\ngithub.com/cockroachdb/cdc-sink/internal/source/cdc.(*Handler).webhook\n\t/Users/thaipham/Projects/cdc-sink/internal/source/cdc/webhook.go:80\ngithub.com/cockroachdb/cdc-sink/internal/source/cdc.(*Handler).ServeHTTP\n\t/Users/thaipham/Projects/cdc-sink/internal/source/cdc/handler.go:118\ngithub.com/cockroachdb/cdc-sink/internal/source/server.logWrapper.func1\n\t/Users/thaipham/Projects/cdc-sink/internal/source/server/wrapper.go:99\nnet/http.HandlerFunc.ServeHTTP\n\t/usr/local/go/src/net/http/server.go:2109\nnet/http.(*ServeMux).ServeHTTP\n\t/usr/local/go/src/net/http/server.go:2487\ngolang.org/x/net/http2/h2c.h2cHandler.ServeHTTP\n\t/Users/thaipham/go/pkg/mod/golang.org/x/net@v0.9.0/http2/h2c/h2c.go:125\nnet/http.serverHandler.ServeHTTP\n\t/usr/local/go/src/net/http/server.go:2947\nnet/http.(*conn).serve\n\t/usr/local/go/src/net/http/server.go:1991\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_arm64.s:1165" error="could not decode payload: json: unknown field \"__crdb__\"" uri=/ycsb/public

I guess i caused the value struct has changed in 23.1

bobvawter commented 1 year ago

What is your CREATE CHANGEFEED command?

thailowki commented 1 year ago

here

CREATE CHANGEFEED INTO 'webhook-https://127.0.0.1:30004/ycsb/public?insecure_tls_skip_verify=true' 
    WITH diff, updated, resolved='10s', webhook_sink_config='{"Flush":{"Messages":1000,"Frequency":"1s"}}' 
        AS SELECT *, cdc_prev FROM YCSB.USERTABLE WHERE 
        event_op() = 'insert' OR 
        event_op() = 'update' OR 
        (event_op() = 'delete' AND 
            crdb_internal.approximate_timestamp(crdb_internal_mvcc_timestamp) < (cdc_prev).crdb_internal_expiration);
ale-dd commented 1 year ago

@bobvawter do you have an estimate of when this issue might be addressed? thank you!

bobvawter commented 1 year ago

Docs: https://github.com/cockroachdb/cdc-sink/wiki/C2C#change-data-capture-queries