Open ajstorm opened 2 years ago
Full log here, involves creating and dropping replication slots multiple times. (anything calling SELECT * was me, anything using DECLARE CURSOR
was DMS)
From logging PG statements, we have to support the following queries:
for initial load - looks like we're missing basically WITH HOLD:
BEGIN;declare "SQL_CUR0x146eb80ff1a0" cursor with hold for SELECT "a","b" FROM "public"."good_table";fetch 10000 in "SQL_CUR0x146eb80ff1a0"
close "SQL_CUR0x146eb80ff1a0";commit
BEGIN;declare "SQL_CUR0x146ebc0e4090" cursor with hold for select CAST (version() as varchar(512));fetch 10000 in "SQL_CUR0x146ebc0e4090"
close "SQL_CUR0x146ebc0e4090";commit
BEGIN;declare "SQL_CUR0x146ebc0e4090" cursor with hold for select cast(setting as integer) from pg_settings where name = 'server_version_num';fetch 10000 in "SQL_CUR0x146ebc0e4090"
close "SQL_CUR0x146ebc0e4090";commit
BEGIN;declare "SQL_CUR0x146ebc0e4090" cursor with hold for SELECT pg_encoding_to_char(encoding) FROM pg_database WHERE datname = current_database();fetch 10000 in "SQL_CUR0x146ebc0e4090"
close "SQL_CUR0x146ebc0e4090";commit
for cdc, PG uses replication slots (examples):
BEGIN;declare "SQL_CUR0x146ed00bada0" cursor with hold for SELECT pg_drop_replication_slot('kl3yh77fl5mhvcyz_00016402_6d5a24d1_0a92_4efc_80e7_9d0c0d409dbc');fetch 10000 in "SQL_CUR0x146ed00bada0"
BEGIN;declare "SQL_CUR0x146ed00bada0" cursor with hold for SELECT lsn FROM pg_create_logical_replication_slot('kl3yh77fl5mhvcyz_00016402_6d5a24d1_0a92_4efc_80e7_9d0c0d409dbc', 'test_decoding');fetch 10000 in "SQL_CUR0x146ed00bada0"
BEGIN;declare "SQL_CUR0x146ed00bada0" cursor with hold for
select cast(tn.is_dst as varchar(8))
from pg_timezone_names tn, pg_settings s
where tn.name = s.setting
and s.name = 'log_timezone'
;fetch 10000 in "SQL_CUR0x146ed00bada0"
BEGIN;declare "SQL_CUR0x146ed00bada0" cursor with hold for
select restart_lsn from pg_replication_slots
where slot_name='kl3yh77fl5mhvcyz_00016402_6d5a24d1_0a92_4efc_80e7_9d0c0d409dbc'
and database ='replicationload'
and plugin ='test_decoding'
;fetch 10000 in "SQL_CUR0x146ed00bada0"
we would then need to implement this stream: https://www.postgresql.org/docs/current/protocol-replication.html
DMS uses the replication protocol mentioned above to listen for updates (instead of pg_logical_slot_get_changes
which is much nicer and recommended by PG):
2022-12-08 19:04:35 UTC:172.31.55.39(37356):postgres@replicationload:[672]:LOG: starting logical decoding for slot "kl3yh77fl5mhvcyz_00016402_6d5a24d1_0a92_4efc_80e7_9d0c0d409dbc"
2022-12-08 19:04:35 UTC:172.31.55.39(37356):postgres@replicationload:[672]:DETAIL: Streaming transactions committing after 1CE/A00005A0, reading WAL from 1CE/A0000568.
2022-12-08 19:04:35 UTC:172.31.55.39(37356):postgres@replicationload:[672]:STATEMENT: START_REPLICATION SLOT "kl3yh77fl5mhvcyz_00016402_6d5a24d1_0a92_4efc_80e7_9d0c0d409dbc" LOGICAL 000001CE/A0000568 ("include-timestamp" 'on')
2022-12-08 19:04:35 UTC:172.31.55.39(37356):postgres@replicationload:[672]:LOG: logical decoding found consistent point at 1CE/A0000568
2022-12-08 19:04:35 UTC:172.31.55.39(37356):postgres@replicationload:[672]:DETAIL: There are no running transactions.
2022-12-08 19:04:35 UTC:172.31.55.39(37356):postgres@replicationload:[672]:STATEMENT: START_REPLICATION SLOT "kl3yh77fl5mhvcyz_00016402_6d5a24d1_0a92_4efc_80e7_9d0c0d409dbc" LOGICAL 000001CE/A0000568 ("include-timestamp" 'on')
which is super ew. can't even debug it using psql
:
psql "postgres://postgres:bobby-123@otan-pg-test.cpa6lrp2ahsc.us-east-1.rds.amazonaws.com:5432/replicationload?replication=database" -c 'START_REPLICATION SLOT "otan_test" LOGICAL 1CE/B8000810;'
unexpected PQresultStatus: 8
Awesome start! After wrapping my head around what you've done, I'm hoping to get the first cut of INSERT/DELETE working today.
Some discussion from an internal slack thread regarding the replication slot stuff:
The replication now gets to START_REPLICATION on DMS ~but seems to exit for some reason and I don't know why~
Instructions:
test_table
in a source database. Run SET CLUSTER SETTING kv.rangefeed.enabled = true
. As the decoding function doesn't work, ensure all columns are ints.PluginName
on the source endpoint for CockroachDB for test_decoding
. Without this you may see a weird error about querying pglogical.node
, which I'm not sure how to resolve.test_table
.Command succeeded: "START_REPLICATION SLOT "o67mdfyzzca6l5zr_00000104_9cf6388a_6451_48f9_908d_4915f0f7bc1a" LOGICAL AAAAAAAA/AA000020 ("include-timestamp" 'on')" with status code: "" (postgres_test_decoding.c:167)
is not normal. Not sure why. Probably because my LSNs are screwed up. For some reason cdc sink seems to play every change from the start, which is not normal, so I reckon it's cursor related. Cursors have to start with A.~ resolvedafter you set it up, whenever restarting cockroachdb for everything to work:
Lot more hacks here with imitating certain triggers / functions exist.
If you want a faster iteration cycle, you can use https://github.com/otan-cockroach/repltest (follow the readme) to inspect the replication stream.
It works now when I changed the table to include the schema name on the replication log. Branch now up to date. Modified issue with write up.
Added a few changes to avoid some of the hard-coding, handle multiple tables, and fix the types. Branch has been updated.
Hi guys Do we have any update on this feature?
Hi @cucxabong. We currently don't have an update as to when this feature will be completed. We do have a working prototype, but it'll take some effort to get it over the line.
Is there a particular reason why you're interested in this feature? To where would you be hoping to migrate the data?
https://github.com/cockroachdb/cockroach/issues/34766 provided support for CRDB to impersonate PG and act as a target for migrations. There is still work remaining if we ever want to support DMS with CRDB as a source. A working prototype is available in https://github.com/cockroachdb/cockroach/pull/93404.
Initial Load
To get the initial load to work, we need the following:
log_timezone
session variable. I believe this should just be UTC. #94123pg_timezone_names
pg_catalog table #94122DECLARE CURSOR ... WITH HOLD
.Replication
replication slot
protocol, including the pg_catalog tables and associated builtins to create a replication slot. In the prototype, we used a global buffer which is populated by CDC using a newly addedreplication://
source URI to do so using the "normal connection protocol", which is wrong as the replication slot protocol has its own parser.test_decoding
plugin, but others may work too with DMS. Note this means settingPluginName
as an additional parameter on the source endpoint in DMS.relreplident
inpg_catalog.pg_class
.Requisite plpgsql and event trigger
Jira issue: CRDB-17695
Epic CC-8911