toluaina / pgsync

Postgres to Elasticsearch/OpenSearch sync
https://pgsync.com
MIT License
1.16k stars 180 forks source link

PG_LOGICAL_SLOT_PEEK_CHANGES query crashes #546

Open carlos-pereira-costoya opened 3 months ago

carlos-pereira-costoya commented 3 months ago

PGSync version: 3.0.0 Postgres version: PostgreSQL 15.5 (AWS Aurora db.r7g.xlarge, with writer and reader nodes) Elasticsearch/OpenSearch version: 2.1.1 (OpenSearch) Redis version: 5.0.6 (ElastiCache) Python version: 3.9

Error Message (if any):

image

Problem description:

Hi @toluaina,

We are experiencing issues in the following context:

We tried executing the query manually and encountered the same behavior:

SELECT count(*) AS count_1 
FROM PG_LOGICAL_SLOT_PEEK_CHANGES('coverage_street', NULL, NULL) 
WHERE CAST(CAST(xid AS TEXT) AS BIGINT) >= 10319409 AND CAST(CAST(xid AS TEXT) AS BIGINT) < 10725529

The query terminates because PostgreSQL cuts off communication after 15-20 minutes. During execution, it uses up to 40 GB of disk space in our case.

image

The only way I've seen it finish is by including the limit upto_nchanges as a parameter (I tried with 100).

In the postgresql there are two replication slots with 68 GB of data each one.

From what I've researched, this happens because the function PG_LOGICAL_SLOT_PEEK_CHANGES needs to convert the changes stored in files on disk into the storage of the database instance.

https://pgpedia.info/p/pg_logical_slot_peek_changes.html

Also, in the past, I've seen work being done to solve this in pgsync:

https://github.com/toluaina/pgsync/issues/267

However, I have not found the parameter PG_LOGICAL_SLOT_UPTO_NCHANGES in either the version I use (3.0.0) or the most recent version of pgsync, so I have not been able to test it. In other versions (3.0.0), there is the parameter PG_LOGICAL_SLOT_UPTO_NCHANGES, but it doesn't work either.

  1. Has this functionality been incorporated to obtain the changes from the slots in batch mode?
  2. Is this behavior common, or does pgsync not allow commits with a large number of records (> 5M)?
  3. Could having a cluster for the PostgreSQL affect the retrieval and processing of data in the temporary space?

Thank you very much!

carlos-pereira-costoya commented 3 months ago

Hi!

With a smaller volume of data, when replication slots contains about to 1GB, the same behavior also occurs. There comes a point where, when trying to retrieve the changes, the connection is cut off.

Cheers

toluaina commented 3 months ago

Hi and sorry about the delay on this. Have you tried reducing LOGICAL_SLOT_CHUNK_SIZE. This is the parameter you want. The default is 5000. Unfortunately, this was not documented. But you can see how it here

carlos-pereira-costoya commented 3 months ago

Hi @toluaina

I just tested with the LOGICAL_SLOT_CHUNK_SIZE=100 with similar results:

image image

In one of the tests, the replication slot has been consumed by pgsync (takes about 45 min). But in another tests we have the same results as in the previous posts of this discussion thread.

The connection to the database is active, in the active connections, the active query was:

SELECT count(*) AS count_1 
FROM PG_LOGICAL_SLOT_PEEK_CHANGES('coverage_street', NULL, NULL) 
WHERE CAST(CAST(xid AS TEXT) AS BIGINT) >= 11560368 AND CAST(CAST(xid AS TEXT) AS BIGINT) < 12072880

There isn't any limit to filter the input in the query. The limit in the PG_LOGICAL_SLOT_PEEK_CHANGES is the third parameter that is NULL (upto_nchanges).

Inside the code there are the following notes;

            upto_nchanges (Optional[int], optional): The maximum number of changes to read. Defaults to None.
            limit (Optional[int], optional): The maximum number of rows to return. Defaults to None.

I think that in this case, we need to set the number of the changes to read also, the upto_nchanges parameter. The LOGICAL_SLOT_CHUNK_SIZE parameter, changes the default value for the limit variable, but is the number of rows to return after retireve the changes from the replication slot.

toluaina commented 3 months ago

The idea was to filter on txmin and txmax and then paginate the resultset with limit/offset. In your case, it seems most of the rows might have the same value for xmin. I will need to re-work the logic to use upto_nchanges specifically.

carlos-pereira-costoya commented 3 months ago

Yes, I haven't checked it, but it's possible that the xmin value of the records is the same because we have a process with several steps involving transactions that perform large commits. For example, in one of the steps, files are loaded from S3 into PostgreSQL tables, but the files contain millions of records. I understand this already generates a high transaction log, although I'm not sure if all of them have the same xmin.

Ok, thanks, can you create a new release or in a separate commit?

toluaina commented 3 months ago
carlos-pereira-costoya commented 3 months ago

Hi!

Thanks for the fix, but yes, the max(lsm) is a very expensive query. I have tested with the last version of the main branch, but I got the same results, the query to peek the changes get a lots of memory and disk:

image

SELECT count(*) AS count_1
FROM PG_LOGICAL_SLOT_PEEK_CHANGES('coverage_street', NULL, NULL)
WHERE CAST(CAST(xid AS TEXT) AS BIGINT) >= 12096230 AND CAST(CAST(xid AS TEXT) AS BIGINT) < 12370229

Current replication slots:

select slot_name, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(),restart_lsn)) as replicationSlotLag,
active from pg_replication_slots;

image

To solve this, It's not possible to create a loop with the upto_nchanges to avoid to decode all database files in the same interval of time? I suposse that would be slow but I thinks that works if the pgsync computes the max in a custom variable.

toluaina commented 3 months ago

Are you suggesting that you are still seeing this query below:

SELECT count(*) AS count_1
FROM PG_LOGICAL_SLOT_PEEK_CHANGES('coverage_street', NULL, NULL)
WHERE CAST(CAST(xid AS TEXT) AS BIGINT) >= 12096230 AND CAST(CAST(xid AS TEXT) AS BIGINT) < 12370229

That query was removed and should no longer be present if you are running off the main branch. Can you please confirm?

carlos-pereira-costoya commented 2 months ago

Sorry, I missplaced the source folder of the code. But I check again and the query with the max is again very expensive:

SELECT max(lsn) AS max_1 
FROM PG_LOGICAL_SLOT_PEEK_CHANGES('coverage_street', NULL, NULL) 
WHERE CAST(CAST(xid AS TEXT) AS BIGINT) >= 12096230 AND CAST(CAST(xid AS TEXT) AS BIGINT) < 12504771

image

toluaina commented 2 months ago
carlos-pereira-costoya commented 2 months ago

Yes, you are rigth, the new version work with the SLOT_SIZE:

SELECT xid, data 
FROM PG_LOGICAL_SLOT_PEEK_CHANGES('coverage_street', '41/A75A0A58', 50) 
WHERE CAST(CAST(xid AS TEXT) AS BIGINT) >= 12096230 AND CAST(CAST(xid AS TEXT) AS BIGINT) < 12513642
carlos-pereira-costoya commented 2 months ago

But finally I got the same error:

File "/usr/local/lib/python3.9/site-packages/pgsync/sync.py", line 1228, in pull
    self.logical_slot_changes(
  File "/usr/local/lib/python3.9/site-packages/pgsync/sync.py", line 382, in logical_slot_changes
    changes: int = self.logical_slot_peek_changes(
  File "/usr/local/lib/python3.9/site-packages/pgsync/base.py", line 594, in logical_slot_peek_changes
    return self.fetchall(statement)
  File "/usr/local/lib/python3.9/site-packages/pgsync/base.py", line 894, in fetchall
    return conn.execute(statement).fetchall()
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1418, in execute
    return meth(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1640, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
    return self._exec_single_context(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1986, in _exec_single_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2353, in _handle_dbapi_exception
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1967, in _exec_single_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 924, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL SYSCALL error: EOF detected

[SQL: SELECT xid, data 
FROM PG_LOGICAL_SLOT_PEEK_CHANGES(%(PG_LOGICAL_SLOT_PEEK_CHANGES_1)s, %(PG_LOGICAL_SLOT_PEEK_CHANGES_2)s, %(PG_LOGICAL_SLOT_PEEK_CHANGES_3)s) 
WHERE CAST(CAST(xid AS TEXT) AS BIGINT) >= %(param_1)s AND CAST(CAST(xid AS TEXT) AS BIGINT) < %(param_2)s]
[parameters: {'PG_LOGICAL_SLOT_PEEK_CHANGES_1': 'coverage_street', 'PG_LOGICAL_SLOT_PEEK_CHANGES_2': '41/A75A0A58', 'PG_LOGICAL_SLOT_PEEK_CHANGES_3': 50, 'param_1': 12096230, 'param_2': 12513642}]
(Background on this error at: https://sqlalche.me/e/20/e3q8)

image

carlos-pereira-costoya commented 2 months ago

I just tested with LOGICAL_SLOT_CHUNK_SIZE=1 but also fails, same error.

toluaina commented 1 month ago

I feel there are multiple things going on here. Is anyone able to reliably reproduce this?

sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL SYSCALL error: EOF detected