duckdb / postgres_scanner

https://duckdb.org/docs/extensions/postgres
MIT License
227 stars 36 forks source link

Duplicated rows in Parquet files #163

Closed tomtaylor closed 5 months ago

tomtaylor commented 8 months ago

What happens?

We run a nightly process to dump some Postgres tables into Parquet files. Sometimes we see a handful of rows duplicated in the output. In last night's example, we saw 33 rows out of 5,396,400 with a duplicate copy. This might be unavoidable with PER_THREAD_OUTPUT enabled against a moving data set, but it might be worth documenting this.

Does it also mean some rows might be missing?

To Reproduce

I think this might be very difficult to reproduce, but our script looks something like this:

FORCE INSTALL postgres_scanner FROM 'http://nightly-extensions.duckdb.org'; 
INSTALL httpfs; 
LOAD httpfs;
SET s3_endpoint='storage.googleapis.com'; 
SET s3_access_key_id='id';
SET s3_secret_access_key='key';
ATTACH 'dbname=foo' AS pg (TYPE postgres);
USE pg;
BEGIN;
COPY (SELECT * FROM pg.table1) TO 's3://bucket/table1.parquet' (FORMAT 'parquet', CODEC 'ZSTD', PER_THREAD_OUTPUT true);
COPY (SELECT * FROM pg.table2) TO 's3://bucket/table2.parquet' (FORMAT 'parquet', CODEC 'ZSTD', PER_THREAD_OUTPUT true);
COMMIT;

Then:

SELECT id, filename, ROW_NUMBER() OVER (PARTITION BY id) FROM read_parquet('data_*.parquet', fil
┌──────────┬────────────────┬─────────────────────────────────────┐
│    id    │    filename    │ row_number() OVER (PARTITION BY id) │
│  int64   │    varchar     │                int64                │
├──────────┼────────────────┼─────────────────────────────────────┤
│ 60449480 │ data_4.parquet │                                   2 │
│ 60725890 │ data_4.parquet │                                   2 │
│ 61009724 │ data_4.parquet │                                   2 │
│ 60844642 │ data_0.parquet │                                   2 │
│ 53617707 │ data_4.parquet │                                   2 │
│ 60574594 │ data_4.parquet │                                   2 │
│ 56486342 │ data_4.parquet │                                   2 │
│ 60034575 │ data_4.parquet │                                   2 │
│ 60574565 │ data_0.parquet │                                   2 │
│ 60698777 │ data_3.parquet │                                   2 │
│ 61080027 │ data_4.parquet │                                   2 │
│ 60261247 │ data_4.parquet │                                   2 │
│ 61079630 │ data_0.parquet │                                   2 │
│ 60386713 │ data_4.parquet │                                   2 │
│ 60008204 │ data_2.parquet │                                   2 │
│ 60261152 │ data_4.parquet │                                   2 │
│ 60983239 │ data_4.parquet │                                   2 │
│ 61092457 │ data_3.parquet │                                   2 │
│ 60856837 │ data_1.parquet │                                   2 │
│ 59246489 │ data_0.parquet │                                   2 │
│ 60224537 │ data_3.parquet │                                   2 │
│ 60569503 │ data_4.parquet │                                   2 │
│ 60905359 │ data_0.parquet │                                   2 │
│ 60859433 │ data_4.parquet │                                   2 │
│ 60255325 │ data_4.parquet │                                   2 │
│ 60341075 │ data_0.parquet │                                   2 │
│ 60968139 │ data_0.parquet │                                   2 │
│ 60574631 │ data_0.parquet │                                   2 │
│ 60560326 │ data_2.parquet │                                   2 │
│ 60927674 │ data_3.parquet │                                   2 │
│ 61092552 │ data_4.parquet │                                   2 │
│ 60574652 │ data_0.parquet │                                   2 │
│ 61051974 │ data_1.parquet │                                   2 │
├──────────┴────────────────┴─────────────────────────────────────┤
│ 33 rows                                               3 columns │
└─────────────────────────────────────────────────────────────────┘

OS:

Linux

PostgreSQL Version:

14.7

DuckDB Version:

0.9.2

DuckDB Client:

CLI

Full Name:

Tom Taylor

Affiliation:

Breakroom

Have you tried this on the latest main branch?

Have you tried the steps to reproduce? Do they include all relevant data and configuration? Does the issue you report still appear there?

Mytherin commented 8 months ago

Thanks for the report!

In principle all Postgres threads should run in the same transaction snapshot as explained here. However, not all Postgres servers support this and this feature is disabled for e.g. AWS Aurora. If you were to repeat the run with SET pg_debug_show_queries=true; you should see statements like SET TRANSACTION SNAPSHOT '00000004-0000B303-1'. If those are missing, you might be running on an unsupported version for transaction snapshots which could explain the duplicate rows. You could run the query SELECT pg_is_in_recovery(), pg_export_snapshot(), (select count(*) from pg_stat_wal_receiver) which is the one DuckDB runs to obtain snapshot information - if that doesn't work then this might not be supported.

As a work-around you can then disable multi-threading (using e.g. SET threads=1 or SET pg_connection_limit=1).

tomtaylor commented 8 months ago

Thanks - we're running on regular Postgres (hosted by Crunchybridge) and I can see those statements executing when I turn on the debug option.

SELECT pg_is_in_recovery(), pg_export_snapshot(), (select count(*) from pg_stat_wal_receiver);
 pg_is_in_recovery | pg_export_snapshot  | count 
-------------------+---------------------+-------
 f                 | 00000058-01D28CC8-1 |     0
(1 row)

This job runs nightly and possibly overlaps with a full logical backup. Is there anything that could interact there?

tomtaylor commented 8 months ago

Interestingly, I just took last night's dump which was a single threaded operation and outputted a single Parquet file for each table and found the same problem:

SELECT id, ROW_NUMBER() OVER (PARTITION BY id) FROM read_parquet('table.parquet') QUALIFY ROW_NUMBER() OVER (PARTITION BY id) > 1;
┌──────────┬─────────────────────────────────────┐
│    id    │ row_number() OVER (PARTITION BY id) │
│  int64   │                int64                │
├──────────┼─────────────────────────────────────┤
│ 61122606 │                                   2 │
│ 61150142 │                                   2 │
│ 61009086 │                                   2 │
│ 58830824 │                                   2 │
│ 60938241 │                                   2 │
│ 60944502 │                                   2 │
│ 61145921 │                                   2 │
│ 61181739 │                                   2 │
│ 60944526 │                                   2 │
│ 61131057 │                                   2 │
│ 60944530 │                                   2 │
│ 59597315 │                                   2 │
│ 61135999 │                                   2 │
│ 61150647 │                                   2 │
│ 61150686 │                                   2 │
│ 60944567 │                                   2 │
│ 61157702 │                                   2 │
│ 61181778 │                                   2 │
│ 60922176 │                                   2 │
│ 60944457 │                                   2 │
│ 60921690 │                                   2 │
│ 61129287 │                                   2 │
│ 61150691 │                                   2 │
│ 60370107 │                                   2 │
│ 61132247 │                                   2 │
│ 60366945 │                                   2 │
│ 60938573 │                                   2 │
│ 60944543 │                                   2 │
│ 59268777 │                                   2 │
│ 60938539 │                                   2 │
│ 60938545 │                                   2 │
│ 60923318 │                                   2 │
│ 60938808 │                                   2 │
│ 61130376 │                                   2 │
│ 60938569 │                                   2 │
│ 61181722 │                                   2 │
│ 61161559 │                                   2 │
│ 60944418 │                                   2 │
├──────────┴─────────────────────────────────────┤
│ 38 rows                              2 columns │
└────────────────────────────────────────────────┘

Performing a similar query against production produces zero results.

SELECT
  id,
  row_number
FROM (
  SELECT
    id,
    row_number() OVER (PARTITION BY id) AS row_number
  FROM
    table) t
WHERE
  row_number > 1
Mytherin commented 8 months ago

That's interesting. How many rows are in the table, and what kind of operations are running over the system in parallel (only insertions, or also updates/deletes)?

For sanity - could you perhaps try writing to a DuckDB table and checking if that produces the same problem to ensure this is not perhaps triggering an issue in the Parquet writer?

tomtaylor commented 8 months ago

Sure thing - what's the best way to execute that to achieve the same effect? I know how to do

FORCE INSTALL postgres_scanner FROM 'http://nightly-extensions.duckdb.org'; 
ATTACH 'dbname=database' AS pg (TYPE postgres, READ_ONLY);
CREATE TABLE table AS SELECT * FROM pg.table;

But I'm not sure if that tests the same path?

Mytherin commented 8 months ago

Yes that tests the same path from the Postgres' reader point of view, so that would be great. Perhaps try running with a single thread again to isolate as much as possible?

tomtaylor commented 8 months ago

I just ran that end to end twice into a DuckDB database and dumped 5037494 rows, followed by 5044745 rows, with no duplicates on either run. To confirm, I downloaded last night's single threaded (by setting the PG connections to 1) Parquet file, and it contains 2 duplicate rows.

The data is moving constantly with inserts/deletes/updates. I estimate 20% of rows change on a daily basis. But it sounds like maybe the Parquet writer might be at fault here?

Mytherin commented 8 months ago

That is possible. It could also be some connection between the Postgres scanner and the Parquet writer. Could you try dumping the contents of the DuckDB file to Parquet, and seeing if there are still no duplicate rows?

tomtaylor commented 8 months ago

I had to SET preserve_insertion_order = false to prevent out of memory issues, but there are no duplicates in the exported Parquet file.

tomtaylor commented 8 months ago

@Mytherin any more thoughts on this? 🙏

Mytherin commented 8 months ago

Could you try SET preserve_insertion_order = false when doing the copy to Parquet from Postgres directly and checking if there are still duplicates? SET preserve_insertion_order = false writes data in a substantially different manner - so it's possible that there's a problem in the insertion-order preserving Parquet write and that disabling that circumvented the bug.

tomtaylor commented 7 months ago

These have been running with SET preserve_insertion_order = false for quite a while now, as we needed to do this to prevent hitting memory limits on the instance we run this job on.

tomtaylor commented 7 months ago

I don't know if it's useful, but I'd be happy to send you one of the sample Parquet files privately.

tomtaylor commented 7 months ago

Unfortunately this is still present in DuckDB 0.10.0.

Mytherin commented 7 months ago

Could you send over one of the broken Parquet files (mark@duckdblabs.com)?

Mytherin commented 7 months ago

Another thing I can think of - could you perhaps export the result of postgres_query('SELECT * FROM tbl') to Parquet instead of querying the table using the Postgres scanner? There might be something funky going on with the ctids we are using (e.g. we might not be grabbing the correct locks to prevent rows from being moved around, perhaps causing duplicate rows to appear in that manner).

noppaz commented 5 months ago

I have this issue as well. From what I can tell it is only and issue with rows that were updated during the COPY process. Debugged the queries that are executed on Postgres from Duckdb and found this:

When multiple connections are used, the first connection will start a transaction:

  1. BEGIN TRANSACTION READ ONLY (defaults to READ COMMITTED)
  2. SELECT pg_is_in_recovery(), pg_export_snapshot(), (select count(*) from pg_stat_wal_receiver)
  3. Start COPYing tables with ctid predicates

All other connections use:

  1. BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY
  2. SET TRANSACTION SNAPSHOT 'some-snapshot-id'
  3. Start COPYing tables with ctid predicates

A read committed transaction can see updates from other transactions, so I think this is what is causing the issue. Furthermore, this will be a problem even though we limit threads, and also if we limit amount of postgres connections with SET pg_connection_limit=1 since this will then only use the first read committed transaction. If all the connections were using repeatable read isolation, I assume this wouldn't be a problem anymore, and multiple connections would use the same transaction snapshot.

Mytherin commented 5 months ago

That's interesting - thanks for investigating! Considering the problem also occurred in single-threaded operation the isolation level could definitely be a likely culprit. I've pushed a PR that switches the main transaction to use repeatable read here - https://github.com/duckdb/postgres_scanner/pull/207. Perhaps you could give that a shot and see if it resolves the issue?

noppaz commented 5 months ago

Nice @Mytherin, its in my interest to get a potential fix out asap so happy to test it out. How would I easiest test out your PR? Or did you mean waiting for the nightly build?

Mytherin commented 5 months ago

You should be able to fetch the artifacts from the PR itself - see here https://github.com/duckdb/postgres_scanner/actions/runs/8601970047?pr=207 (scroll down to "Artifacts")

noppaz commented 5 months ago

Thanks a lot @Mytherin, I was able to load the unsigned extension and try this out.

I ran a single connection without any threading to get the most likelihood of changes to be applied during different copy statements with ctid ranges. And concurrently ran a script to update a bunch of rows in Postgres. Was able to verify that with the current extension in 0.10.1 duplicates from the different transactions are created, but with the new extension from the PR this was not the case.

Let me know if you need anything else here, I'd really like to help push this out asap.

tomtaylor commented 5 months ago

Thanks so much for spotting the underlying issue here @noppaz!

Mytherin commented 5 months ago

Thanks - I’ve merged the PR. I can push out a new version of the extension tomorrow after the nightly builds.

Mytherin commented 5 months ago

I've published the nightly - FORCE INSTALL postgres_scanner should get you the new version in v0.10.1 (potentially pending some cloudflare caches being cleared)