Closed jorritsandbrink closed 2 months ago
@rudolfix see my replies on your comments.
Regarding pgoutput
vs wal2json
:
pypgoutput
first was convenience for the user (no additional setup needed)—for what it's worth, Debezium also supports pypgoutput
but not wal2json
pgoutput
, meaning that additional client-side filtering is needed (selecting the right table, and the right DML operation if you want to only propagate inserts for example), offsetting some of the benefits of server-side decoding—see this SO on the topicwal2json
will probably be a little simpler, but not a lot—the message objects generated by pypgoutput
(e.g. Insert
, Update
, ...) are almost as easy to work with as wal2json
's outputThat being said, wal2json
might be better (necessary) when data volumns are large. I'd suggest we go with pgoutput
first, then add wal2json
support later if needed.
Thanks for your feedback Martin, let's continue the discussion here for visibility.
Can you process individual tables in fivetran / peardb / the other systems you're referring to, or do you have to process all of them simultaneously?
@jorritsandbrink it should be easy. if there > 1 table we still have one LSN with which we track new messages right? if so the change is trivial: you can specify a table name which is resolved dynamically: https://dlthub.com/docs/general-usage/resource#dispatch-data-to-many-tables
drawbacks:
sql_table
resources@rudolfix I've addressed all your comments, please review.
To be done: increase dlt
version in requirements.txt
to include PR 1127.
@jorritsandbrink I finally enabled tests on our CI, and most of them are passing but:
with src_pl.sql_client() as c:
qual_name = src_pl.sql_client().make_qualified_table_name("items")
c.execute_sql(f"UPDATE {qual_name} SET foo = 'baz' WHERE id = 2;")
c.execute_sql(f"DELETE FROM {qual_name} WHERE id = 2;")
extract_info = dest_pl.extract(changes)
> assert extract_info.asdict()["job_metrics"] == []
E AssertionError: assert [{'created': ...e': 501, ...}] == []
E Left contains one more item: {'created': 1713123289.1342065, 'extract_idx': 1, 'file_path': '/home/rudolfix/src/pipelines/_storage/.dlt/pipelines/d...lize/a93aba1460a1d099/1713123287.6514962/new_jobs/_dlt_pipeline_state.bfcae69f3b.0.typed-jsonl', 'file_size': 501, ...}
E Full diff:
E [
E - ,
E + {'created': 1713123289.1342065,
E + 'extract_idx': 1,
E + 'file_path': '/home/rudolfix/src/pipelines/_storage/.dlt/pipelines/dest_pl/normalize/a93aba1460a1d099/1713123287.6514962/new_jobs/_dlt_pipeline_state.bfcae69f3b.0.typed-jsonl',...
E
E ...Full output truncated (6 lines hidden), use '-vv' to show
try:
> cur.execute(
f"ALTER PUBLICATION {esc_pub_name} ADD TABLES IN SCHEMA {esc_schema_name};"
)
E psycopg2.errors.SyntaxError: syntax error at or near "TABLES"
E LINE 1: ALTER PUBLICATION "test_pub6589482f" ADD TABLES IN SCHEMA "s...
@jorritsandbrink I finally enabled tests on our CI, and most of them are passing but:
- this one consistently fails
with src_pl.sql_client() as c: qual_name = src_pl.sql_client().make_qualified_table_name("items") c.execute_sql(f"UPDATE {qual_name} SET foo = 'baz' WHERE id = 2;") c.execute_sql(f"DELETE FROM {qual_name} WHERE id = 2;") extract_info = dest_pl.extract(changes) > assert extract_info.asdict()["job_metrics"] == [] E AssertionError: assert [{'created': ...e': 501, ...}] == [] E Left contains one more item: {'created': 1713123289.1342065, 'extract_idx': 1, 'file_path': '/home/rudolfix/src/pipelines/_storage/.dlt/pipelines/d...lize/a93aba1460a1d099/1713123287.6514962/new_jobs/_dlt_pipeline_state.bfcae69f3b.0.typed-jsonl', 'file_size': 501, ...} E Full diff: E [ E - , E + {'created': 1713123289.1342065, E + 'extract_idx': 1, E + 'file_path': '/home/rudolfix/src/pipelines/_storage/.dlt/pipelines/dest_pl/normalize/a93aba1460a1d099/1713123287.6514962/new_jobs/_dlt_pipeline_state.bfcae69f3b.0.typed-jsonl',... E E ...Full output truncated (6 lines hidden), use '-vv' to show
- those require postgres 15 while we are on 13 on CI. maybe you could take a look? is there a way to use the old syntax?
try: > cur.execute( f"ALTER PUBLICATION {esc_pub_name} ADD TABLES IN SCHEMA {esc_schema_name};" ) E psycopg2.errors.SyntaxError: syntax error at or near "TABLES" E LINE 1: ALTER PUBLICATION "test_pub6589482f" ADD TABLES IN SCHEMA "s...
@rudolfix The first issue is actually also version related. I was testing on Postgres 16 locally, but have been able to reproduce both issues on Postgres 13.
1) It seems Postgres 13 publishes "empty transactions" for updates/deletes when they are excluded from the publish
publication parameter (e.g. when publish = 'insert'
). Postgres 16 does not do this. As a result we do find messages to process (the "empty transactions") when we've done an update or delete, even though we told Postgres we're not interested in them. last_commit_lsn
in resource state needs to be updated accordingly. An item gets extracted for _dlt_pipeline_state
because of the state update, where our test asserted nothing is extracted. Solved by making the test more specific and asserting nothing gets extracted for the items
table: https://github.com/dlt-hub/verified-sources/pull/392/commits/34610b634f29348d6362a2f26fa946ed3b93cf37
2) Not really feasible to make this work for older Postgres versions. I could fetch all tables from the schema and add them one by one, but that wouldn't accomodate the case where a table gets added to the schema later. To keep things clean, I introduced a Postgres version requirement for schema replication instead: https://github.com/dlt-hub/verified-sources/pull/392/commits/7a070453e93bed4c7863b5ee82f681dd12e909e5
@jorritsandbrink should we spawn another postgres instance just to test replication? I do not want to make it too complicated and I'm totally fine with 15. version.
@rudolfix yes, using a separate instance for replication sounds good.
Tell us what you do here
verified source
)Relevant issue
https://github.com/dlt-hub/dlt/issues/933
More PR info
Adds initial support for postgres replication. Some things are still missing, but this is a good time to get feedback.
pgoutput
pluginpsycopg2
's support for logical replication—this streams messages frompgoutput
into Python in an endless looppypgoutput
to decodepgoutput
's binary messages—the library's functionality to consume messages and transform them into "change events" (Pydantic models) is not used because it only works on Linuxdlt
-compatible Python objects, e.g. the string"t"
becomes the booleanTrue
. "Binary mode" would be faster, but less robust.Relies on a dedicated replication slot and publication for a table. I.e. two tables means two slots and two publications. This provides granular control and does not intruduce significant overhead if I'm not mistaken.No longer the case, changed because of user feedback. It is now possible to replicate one table, multiple tables, or an entire schema using a single publication.Adds two resource types:table_snapshot
for initial load, andtable_changes
for CDC.table_snapshot
persists the state of the table in the snapshot that gets exported when creating a replication slot into a physical table, and then usessql_table
resource to do the resttable_changes
generates and yields "data items" (TDataItem
) and "metadata items" (DataItemWithMeta
) from decoded replication messages. Items are first stored in-memory in a list, before they are yielded from this list.init_replication
to setup a replication slot and publication. This function optionally persists snapshot tables representing the state of the exact moment the replication slot got created. It then returnssql_table
resources to enable an initial load. Users do not need to useinit_replication
—they can create a slot and publication in any other way they see fit.replication_resource
to create aDltResource
that consumes a slot/publication and generates data items with metadata (DataItemWithMeta
). It dispatches data to multiple tables if the publication publishes changes for multiple tables.include_columns
argument to exclude any columns not provided as input (or includes all columns if not provided)Organizes code in subfolder underMoved to its own top-level folder.sql_database
:/sources/sql_database/pg_replication
What's not (yet) included:
Chunking mechanism to limit batch size inimplementedtable_changes
no longer applies—multiple tables are now handled at the resource levelDltSource
to handle multiple tables / an entire databasetruncate
operationPerhaps some more data type mappingdone—common types are handled and exotic types default totext
Deletion of snapshot table after it has been consumednot implemented—couldn't find a good way to do thisExample pipelinedoneMore testsdone