toluaina / pgsync

Postgres to Elasticsearch/OpenSearch sync
https://pgsync.com
MIT License
1.16k stars 180 forks source link

Diskfull on AWS RDS when using large tables #129

Open smokedsalmonbagel opened 3 years ago

smokedsalmonbagel commented 3 years ago

PGSync version: 2.0.0 Postgres version: 10.11 Elasticsearch version: 7.1 Redis version:

Python version:

Problem Description: If I add any large table to my schema Postgres gives a diskfull error. I tried using a lower value for QUERY_CHUNK_SIZE and fewer columns for child tables. I also tried to specify the relationship keys manually:

[
    {
      "database": "ebdb", 
      "index":"transactions",
      "nodes":[
          {
              "table": "transactions",
              "schema": "public",
              "children": [
                {
                    "table": "p_transactions",
                    "label": "pTransactions",
                    "columns":["transaction_type","admitted","employee_id"],
                    "relationship": {
                        "variant": "object",
                        "type": "one_to_many",
                        "foreign_key": {
                            "child": ["transaction_id"],
                            "parent": ["id"]
                        }
                    }
                }

              ]
          }
        ]

    }
]

Error Message (if any):

ERROR:pgsync.sync: Exception (psycopg2.errors.DiskFull) could not write to file "base/pgsql_tmp/pgsql_tmp24128.1025": No space left on device

Our primary index table has about 26M records - our child table has about 18M records. Most documents would have just a few nested child objects, but that initial join seems to be too big. I suspect that the "chunking" is happening after the joins - can this be confirmed? Is there a work around for these large tables?

toluaina commented 3 years ago

Sorry for the slow response again. Can you confirm how much space was available before.

My guess is the large joins are resulting in temp tables being created.

Have you considered increasing:

smokedsalmonbagel commented 3 years ago

Yes - I am almost certain that is happening. If I add any one_to_many relationship to the schema with this table I run out of disk space. There is not much control over disk space on RDS but I did try setting those two parameters. It looks like we are getting disk full when the temp table is around 7GB.

smokedsalmonbagel commented 3 years ago

I was hoping to be able to debug this a little more as I getting this error on a single join. The two tables are large but not nearly as large as others I have seen who were able to use pgsync. What is the best way to dump the generated query? Is there an debugging step I could take to narrow down this issue?

toluaina commented 3 years ago

Can you re run the application in verbose i.e pgsync -c schema.json -v

This should log the actualy query being run and then you can run that Query against your backend db?

toluaina commented 3 years ago

You can also send me the output removing any sensitive data and I can extract the Query you need to run

toluaina commented 3 years ago

I am guessing this is the initial sync.

smokedsalmonbagel commented 3 years ago

Yes - that would be great. This is the initial sync. I reduced the number of columns for this test and got an out of memory error this time. In any case here are the rsults:

pgsync_1         | pg_settings:
pgsync_1         | SELECT setting
pgsync_1         | FROM pg_settings
pgsync_1         | WHERE name = %(name_1)spg_settings:
pgsync_1         | SELECT setting
pgsync_1         | FROM pg_settings
pgsync_1         | WHERE name = %(name_1)spg_settings:
pgsync_1         | SELECT setting
pgsync_1         | FROM pg_settings
pgsync_1         | WHERE name = %(name_1)s - transactions
pgsync_1         |     - external_transactions
pgsync_1         | replication_slots:
pgsync_1         | SELECT *
pgsync_1         | FROM PG_REPLICATION_SLOTS
pgsync_1         | WHERE slot_name = %(slot_name_1)s
pgsync_1         |   AND slot_type = %(slot_type_1)s
pgsync_1         |   AND plugin = %(plugin_1)sSELECT TABLE_NAME,
pgsync_1         |        primary_keys
pgsync_1         | FROM _pkey_viewSELECT TABLE_NAME AS TABLE_NAME,
pgsync_1         |                      array_agg(fkeys) AS foreign_keys
pgsync_1         | FROM _fkey_view,
pgsync_1         |      unnest(foreign_keys) AS fkeys
pgsync_1         | GROUP BY TABLE_NAMESELECT table_constraints_1.table_name,
pgsync_1         |        array_agg(CAST(key_column_usage_1.column_name AS TEXT)) AS foreign_keys
pgsync_1         | FROM information_schema.table_constraints AS table_constraints_1
pgsync_1         | JOIN information_schema.key_column_usage AS key_column_usage_1 ON key_column_usage_1.constraint_name = table_constraints_1.constraint_name
pgsync_1         | AND key_column_usage_1.table_schema = table_constraints_1.table_schema
pgsync_1         | JOIN information_schema.constraint_column_usage AS constraint_column_usage_1 ON constraint_column_usage_1.constraint_name = table_constraints_1.constraint_name
pgsync_1         | AND constraint_column_usage_1.table_schema = table_constraints_1.table_schema
pgsync_1         | WHERE table_constraints_1.table_name IN ([POSTCOMPILE_table_name_1])
pgsync_1         |   AND table_constraints_1.constraint_type = %(constraint_type_1)s
pgsync_1         | GROUP BY table_constraints_1.table_nameSELECT TABLE_NAME,
pgsync_1         |        primary_keys
pgsync_1         | FROM _pkey_viewSELECT TABLE_NAME AS TABLE_NAME,
pgsync_1         |                      array_agg(fkeys) AS foreign_keys
pgsync_1         | FROM _fkey_view,
pgsync_1         |      unnest(foreign_keys) AS fkeys
pgsync_1         | GROUP BY TABLE_NAMESELECT table_constraints_1.table_name,
pgsync_1         |        array_agg(CAST(key_column_usage_1.column_name AS TEXT)) AS foreign_keys
pgsync_1         | FROM information_schema.table_constraints AS table_constraints_1
pgsync_1         | JOIN information_schema.key_column_usage AS key_column_usage_1 ON key_column_usage_1.constraint_name = table_constraints_1.constraint_name
pgsync_1         | AND key_column_usage_1.table_schema = table_constraints_1.table_schema
pgsync_1         | JOIN information_schema.constraint_column_usage AS constraint_column_usage_1 ON constraint_column_usage_1.constraint_name = table_constraints_1.constraint_name
pgsync_1         | AND constraint_column_usage_1.table_schema = table_constraints_1.table_schema
pgsync_1         | WHERE table_constraints_1.table_name IN ([POSTCOMPILE_table_name_1])
pgsync_1         |   AND table_constraints_1.constraint_type = %(constraint_type_1)s
pgsync_1         | GROUP BY table_constraints_1.table_nameSELECT TABLE_NAME,
pgsync_1         |        primary_keys
pgsync_1         | FROM _pkey_viewSELECT TABLE_NAME AS TABLE_NAME,
pgsync_1         |                      array_agg(fkeys) AS foreign_keys
pgsync_1         | FROM _fkey_view,
pgsync_1         |      unnest(foreign_keys) AS fkeys
pgsync_1         | GROUP BY TABLE_NAMESELECT table_constraints_1.table_name,
pgsync_1         |        array_agg(CAST(key_column_usage_1.column_name AS TEXT)) AS foreign_keys
pgsync_1         | FROM information_schema.table_constraints AS table_constraints_1
pgsync_1         | JOIN information_schema.key_column_usage AS key_column_usage_1 ON key_column_usage_1.constraint_name = table_constraints_1.constraint_name
pgsync_1         | AND key_column_usage_1.table_schema = table_constraints_1.table_schema
pgsync_1         | JOIN information_schema.constraint_column_usage AS constraint_column_usage_1 ON constraint_column_usage_1.constraint_name = table_constraints_1.constraint_name
pgsync_1         | AND constraint_column_usage_1.table_schema = table_constraints_1.table_schema
pgsync_1         | WHERE table_constraints_1.table_name IN ([POSTCOMPILE_table_name_1])
pgsync_1         |   AND table_constraints_1.constraint_type = %(constraint_type_1)s
pgsync_1         | GROUP BY table_constraints_1.table_nameSELECT TABLE_NAME,
pgsync_1         |        primary_keys
pgsync_1         | FROM _pkey_viewSELECT TABLE_NAME AS TABLE_NAME,
pgsync_1         |                      array_agg(fkeys) AS foreign_keys
pgsync_1         | FROM _fkey_view,
pgsync_1         |      unnest(foreign_keys) AS fkeys
pgsync_1         | GROUP BY TABLE_NAMESELECT table_constraints_1.table_name,
pgsync_1         |        array_agg(CAST(key_column_usage_1.column_name AS TEXT)) AS foreign_keys
pgsync_1         | FROM information_schema.table_constraints AS table_constraints_1
pgsync_1         | JOIN information_schema.key_column_usage AS key_column_usage_1 ON key_column_usage_1.constraint_name = table_constraints_1.constraint_name
pgsync_1         | AND key_column_usage_1.table_schema = table_constraints_1.table_schema
pgsync_1         | JOIN information_schema.constraint_column_usage AS constraint_column_usage_1 ON constraint_column_usage_1.constraint_name = table_constraints_1.constraint_name
pgsync_1         | AND constraint_column_usage_1.table_schema = table_constraints_1.table_schema
pgsync_1         | WHERE table_constraints_1.table_name IN ([POSTCOMPILE_table_name_1])
pgsync_1         |   AND table_constraints_1.constraint_type = %(constraint_type_1)s
pgsync_1         | GROUP BY table_constraints_1.table_nameSELECT TABLE_NAME,
pgsync_1         |        primary_keys
pgsync_1         | FROM _pkey_viewSELECT TABLE_NAME AS TABLE_NAME,
pgsync_1         |                      array_agg(fkeys) AS foreign_keys
pgsync_1         | FROM _fkey_view,
pgsync_1         |      unnest(foreign_keys) AS fkeys
pgsync_1         | GROUP BY TABLE_NAMESELECT table_constraints_1.table_name,
pgsync_1         |        array_agg(CAST(key_column_usage_1.column_name AS TEXT)) AS foreign_keys
pgsync_1         | FROM information_schema.table_constraints AS table_constraints_1
pgsync_1         | JOIN information_schema.key_column_usage AS key_column_usage_1 ON key_column_usage_1.constraint_name = table_constraints_1.constraint_name
pgsync_1         | AND key_column_usage_1.table_schema = table_constraints_1.table_schema
pgsync_1         | JOIN information_schema.constraint_column_usage AS constraint_column_usage_1 ON constraint_column_usage_1.constraint_name = table_constraints_1.constraint_name
pgsync_1         | AND constraint_column_usage_1.table_schema = table_constraints_1.table_schema
pgsync_1         | WHERE table_constraints_1.table_name IN ([POSTCOMPILE_table_name_1])
pgsync_1         |   AND table_constraints_1.constraint_type = %(constraint_type_1)s
pgsync_1         | GROUP BY table_constraints_1.table_nameSELECT TABLE_NAME,
pgsync_1         |        primary_keys
pgsync_1         | FROM _pkey_viewSELECT TABLE_NAME AS TABLE_NAME,
pgsync_1         |                      array_agg(fkeys) AS foreign_keys
pgsync_1         | FROM _fkey_view,
pgsync_1         |      unnest(foreign_keys) AS fkeys
pgsync_1         | GROUP BY TABLE_NAMESELECT table_constraints_1.table_name,
pgsync_1         |        array_agg(CAST(key_column_usage_1.column_name AS TEXT)) AS foreign_keys
pgsync_1         | FROM information_schema.table_constraints AS table_constraints_1
pgsync_1         | JOIN information_schema.key_column_usage AS key_column_usage_1 ON key_column_usage_1.constraint_name = table_constraints_1.constraint_name
pgsync_1         | AND key_column_usage_1.table_schema = table_constraints_1.table_schema
pgsync_1         | JOIN information_schema.constraint_column_usage AS constraint_column_usage_1 ON constraint_column_usage_1.constraint_name = table_constraints_1.constraint_name
pgsync_1         | AND constraint_column_usage_1.table_schema = table_constraints_1.table_schema
pgsync_1         | WHERE table_constraints_1.table_name IN ([POSTCOMPILE_table_name_1])
pgsync_1         |   AND table_constraints_1.constraint_type = %(constraint_type_1)s
pgsync_1         | GROUP BY table_constraints_1.table_namecreate_replication_slot:
pgsync_1         | SELECT *
pgsync_1         | FROM PG_CREATE_LOGICAL_REPLICATION_SLOT(%(PG_CREATE_LOGICAL_REPLICATION_SLOT_1)s, %(PG_CREATE_LOGICAL_REPLICATION_SLOT_2)s)/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/base.py:3803: SAWarning: Did not recognize type 'citext' of column 'email'
pgsync_1         |   "Did not recognize type '%s' of column '%s'" % (attype, name)
pgsync_1         | /usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/base.py:4145: SAWarning: Skipped unsupported reflection of expression-based index landmark_gin
pgsync_1         |   "expression-based index %s" % idx_name
pgsync_1         | /usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/base.py:4145: SAWarning: Skipped unsupported reflection of expression-based index event_gin
pgsync_1         |   "expression-based index %s" % idx_name
pgsync_1         | /usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/base.py:4145: SAWarning: Skipped unsupported reflection of expression-based index ee_gin
pgsync_1         |   "expression-based index %s" % idx_name
pgsync_1         | /usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/base.py:4145: SAWarning: Skipped unsupported reflection of expression-based index idx_external_data
pgsync_1         |   "expression-based index %s" % idx_name
pgsync_1         | /usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/base.py:4145: SAWarning: Skipped unsupported reflection of expression-based index ell_gin
pgsync_1         |   "expression-based index %s" % idx_name
pgsync_1         | /usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/base.py:3803: SAWarning: Did not recognize type 'meta_tag.ltree' of column 'path'
pgsync_1         |   "Did not recognize type '%s' of column '%s'" % (attype, name)
pgsync_1         | /usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/base.py:3803: SAWarning: Did not recognize type 'citext' of column 'email'
pgsync_1         |   "Did not recognize type '%s' of column '%s'" % (attype, name)
pgsync_1         | /usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/base.py:4145: SAWarning: Skipped unsupported reflection of expression-based index landmark_gin
pgsync_1         |   "expression-based index %s" % idx_name
pgsync_1         | /usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/base.py:4145: SAWarning: Skipped unsupported reflection of expression-based index event_gin
pgsync_1         |   "expression-based index %s" % idx_name
pgsync_1         | /usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/base.py:4145: SAWarning: Skipped unsupported reflection of expression-based index ee_gin
pgsync_1         |   "expression-based index %s" % idx_name
pgsync_1         | /usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/base.py:4145: SAWarning: Skipped unsupported reflection of expression-based index idx_external_data
pgsync_1         |   "expression-based index %s" % idx_name
pgsync_1         | /usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/base.py:4145: SAWarning: Skipped unsupported reflection of expression-based index ell_gin
pgsync_1         |   "expression-based index %s" % idx_name
pgsync_1         | 2021-06-18 23:15:47.588:ERROR:pgsync.sync: Exception (psycopg2.errors.OutOfMemory) out of memory
pgsync_1         | DETAIL:  Failed on request of size 633781387.
pgsync_1         |
pgsync_1         | [SQL: SELECT anon_1."JSON_BUILD_ARRAY_1", anon_1."JSON_BUILD_OBJECT_1", anon_1.id
pgsync_1         | FROM (SELECT JSON_BUILD_ARRAY(anon_2._keys) AS "JSON_BUILD_ARRAY_1", JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_2)s, transactions_1.id, %(JSON_BUILD_OBJECT_3)s, transactions_1.status, %(JSON_BUILD_OBJECT_4)s, anon_2.external_transactions) AS "JSON_BUILD_OBJECT_1", transactions_1.id AS id
pgsync_1         | FROM public.transactions AS transactions_1 LEFT OUTER JOIN (SELECT CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_5)s, JSON_AGG(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_6)s, JSON_BUILD_ARRAY(external_transactions_1.id)))) AS JSONB) AS _keys, JSON_AGG(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_7)s, external_transactions_1.event_id, %(JSON_BUILD_OBJECT_8)s, external_transactions_1.customer_name)) AS external_transactions, external_transactions_1.transaction_id AS transaction_id
pgsync_1         | FROM public.external_transactions AS external_transactions_1 GROUP BY external_transactions_1.transaction_id) AS anon_2 ON anon_2.transaction_id = transactions_1.id
pgsync_1         | WHERE CAST(CAST(transactions_1.xmin AS TEXT) AS BIGINT) < %(param_1)s) AS anon_1]
pgsync_1         | [parameters: {'JSON_BUILD_OBJECT_2': 'id', 'JSON_BUILD_OBJECT_3': 'status', 'JSON_BUILD_OBJECT_4': 'external_transactions', 'JSON_BUILD_OBJECT_5': 'external_transactions', 'JSON_BUILD_OBJECT_6': 'id', 'JSON_BUILD_OBJECT_7': 'event_id', 'JSON_BUILD_OBJECT_8': 'customer_name', 'param_1': 16748362}]
pgsync_1         | (Background on this error at: http://sqlalche.me/e/14/e3q8)
pgsync_1         | Traceback (most recent call last):
pgsync_1         |   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1721, in _execute_context
pgsync_1         |     result = context._setup_result_proxy()
pgsync_1         |   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 1408, in _setup_result_proxy
pgsync_1         |     self.cursor, self.execution_options
pgsync_1         |   File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/cursor.py", line 1026, in __init__
pgsync_1         |     self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1))
pgsync_1         | psycopg2.errors.OutOfMemory: out of memory
pgsync_1         | DETAIL:  Failed on request of size 633781387.

Here is my schema - this time I left off manual "foreign_key" option. (I have changed from p_transcations to external_transactions because I would get the out of disk error even before the query would show.)

[
    {
      "database": "db", 
      "index":"transactions",
      "nodes":[
          {
              "table": "transactions",
              "schema": "public",
              "columns" :["id","status"],
              "children": [
                {
                    "table": "external_transactions",
                    "columns" :["event_id","customer_name"],
                    "relationship": {
                        "variant": "object",
                        "type": "one_to_many"
                    }

                }

              ]
          }
        ]

    }
]

Table 'external_transactions' has foreign key 'transaction_id' which is connected to the primary key 'id' in the transactions table but these tables can also be joined through other tables in theory such as the users table. Could this be an issue?

smokedsalmonbagel commented 3 years ago

@toluaina - any luck on extracting the query?

toluaina commented 3 years ago

Sorry about this.

Here is the resulting query

SELECT 
anon_1."JSON_BUILD_ARRAY_1", anon_1."JSON_BUILD_OBJECT_1", anon_1.id
FROM (SELECT JSON_BUILD_ARRAY(anon_2._keys) AS "JSON_BUILD_ARRAY_1", JSON_BUILD_OBJECT('id', transactions_1.id, 'status', transactions_1.status, 'external_transactions', anon_2.external_transactions) AS "JSON_BUILD_OBJECT_1", transactions_1.id AS id
FROM public.transactions AS transactions_1 LEFT OUTER JOIN (SELECT CAST(JSON_BUILD_OBJECT('external_transactions', JSON_AGG(JSON_BUILD_OBJECT('id', JSON_BUILD_ARRAY(external_transactions_1.id)))) AS JSONB) AS _keys, JSON_AGG(JSON_BUILD_OBJECT('event_id', external_transactions_1.event_id, 'customer_name', external_transactions_1.customer_name)) AS external_transactions, external_transactions_1.transaction_id AS transaction_id
FROM public.external_transactions AS external_transactions_1 GROUP BY external_transactions_1.transaction_id) AS anon_2 ON anon_2.transaction_id = transactions_1.id
WHERE CAST(CAST(transactions_1.xmin AS TEXT) AS BIGINT) < 16748362) AS anon_1
toluaina commented 3 years ago

Can you also run an EXPLAIN on this query?

toluaina commented 3 years ago

I think you need to increase the work_mem and restart the database server

smokedsalmonbagel commented 3 years ago

Sure -

QUERY PLAN                                                                                                                       
---------------------------------------------------------------------------------------------------------------------------------+
Hash Left Join  (cost=6845232.35..8251322.35 rows=9472027 width=80)                                                              
  Hash Cond: (transactions_1.id = external_transactions_1.transaction_id)                                                        
  ->  Seq Scan on transactions transactions_1  (cost=0.00..1135897.80 rows=9472027 width=20)                                     
        Filter: (((xmin)::text)::bigint < 16748362)                                                                              
  ->  Hash  (cost=6672638.57..6672638.57 rows=6850222 width=80)                                                                  
        ->  GroupAggregate  (cost=6112572.83..6604136.35 rows=6850222 width=80)                                                  
              Group Key: external_transactions_1.transaction_id                                                                  
              ->  Sort  (cost=6112572.83..6160777.62 rows=19281916 width=61)                                                     
                    Sort Key: external_transactions_1.transaction_id                                                             
                    ->  Seq Scan on external_transactions external_transactions_1  (cost=0.00..1604524.16 rows=19281916 width=61)

I was under the impression that Postgres just uses temp disk space when out of memory. In other words if work_mem is too low it will mean slower queries but not failed queries. Will try increasing work_mem to 128MB - right now we are at default (4MB).

smokedsalmonbagel commented 3 years ago

Wondering if there might be a work around for this large join.

toluaina commented 3 years ago

Is the large join the bottleneck here? Does that query run in a psql shell? I can't think of an easy way around the large joins. I am working on a POC for pushing large datasets in the initial sync. but this is purely at the research stage for now. In my experience increasing the resource allocated to postgres and some tuning is the only solution.

smokedsalmonbagel commented 3 years ago

Yes I am pretty sure it is. The result when running directly on the DB is the same. I will try to see if there is a configuration or instance size on RDS that can handle this.

Good to know large initial syncs are a possibly on the roadmap. We would really like to use pgsync.

Thanks!