toluaina / pgsync

Postgres to Elasticsearch/OpenSearch sync
https://pgsync.com
MIT License
1.11k stars 174 forks source link

PGSync daemon mode crashes - no error in logs (DEBUG mode, verbose option) #414

Closed rafamarqqqs closed 1 year ago

rafamarqqqs commented 1 year ago

PGSync version: 2.3.3

Postgres version: 13.7

Elasticsearch version: 8.5.0

Redis version: 6.2.6

Python version: 3.7

Problem Description:

Hi! I'm using PGSync to sync two simple tables, the bootstrap command works fine but the daemon does not start syncing changes. PGSync daemon mode crashes without any information at all (even in DEBUG log mode and with the verbose option).

Schema:

[
  {
    "database": "broker_database",
    "index": "brokers",
    "plugins": [
      "AddKindBroker"
    ],
    "nodes": {
      "table": "broker",
      "columns": [
        "id",
        "symbol",
        "display_name"
      ],
      "children": [
        {
          "table": "country",
          "columns": [
            "name",
            "symbol"
          ],
          "primary_key": [
            "id"
          ],
          "relationship": {
            "variant": "object",
            "type": "one_to_one",
            "foreign_key": {
              "child": [
                "id"
              ],
              "parent": [
                "country_id"
              ]
            }
          }
        }
      ]
    },
    "mapping": {
      "tax_id": {
        "type": "keyword"
      },
      "kind": {
        "type": "keyword"
      }
    }
  }
]

The workflow I'm using when updating a schema is:

  1. Bootstrap teardown:
    bootstrap -v -c schemas/brokers.json -t
  2. Bootstrap:
    bootstrap -v -c schemas/brokers.json
  3. Run pgsync without daemon mode:
    pgsync -v -c schemas/brokers.json
  4. Start PGSync in daemon mode:
    pgsync -v -c schemas/brokers.json --daemon

The bootstrap log seems fine:

2023-02-14 18:48:40.337:INFO:pgsync.utils: Settings:
2023-02-14 18:48:40.337:INFO:pgsync.utils: Schema    : schemas/brokers.json
2023-02-14 18:48:40.337:INFO:pgsync.utils: -----------------------------------------------------------------
2023-02-14 18:48:40.337:INFO:pgsync.utils: Checkpoint:
2023-02-14 18:48:40.337:INFO:pgsync.utils: Path: /data
2023-02-14 18:48:40.337:INFO:pgsync.utils: Postgres:
2023-02-14 18:48:40.338:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:48:40.340:INFO:pgsync.utils: URL: postgresql+psycopg2://pgsync:****************@****************
2023-02-14 18:48:40.341:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:48:40.341:INFO:pgsync.utils: Elasticsearch:
2023-02-14 18:48:40.341:INFO:pgsync.utils: URL: http://pgsync:****************@****************
2023-02-14 18:48:40.341:INFO:pgsync.utils: Redis:
2023-02-14 18:48:40.341:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:48:40.342:DEBUG:pgsync.urls: Connecting to Redis without password.
2023-02-14 18:48:40.342:INFO:pgsync.utils: URL: redis://****************:6379/0
2023-02-14 18:48:40.342:INFO:pgsync.utils: -----------------------------------------------------------------
2023-02-14 18:48:40.342:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:48:40.347:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:48:40.356:INFO:elasticsearch: GET http://****************:9200/ [status:200 request:0.008s]
2023-02-14 18:48:40.357:INFO:elasticsearch: GET http://****************:9200/ [status:200 request:0.001s]
2023-02-14 18:48:40.358:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:48:40.358:DEBUG:pgsync.urls: Connecting to Redis without password.
2023-02-14 18:48:40.392:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:48:40.392:DEBUG:pgsync.plugin: Plugin class: plugins.securities.AddKindBrokerPlugin
2023-02-14 18:48:40.406:DEBUG:pgsync.base: Creating replication slot: _tmp_
2023-02-14 18:48:40.427:DEBUG:pgsync.base: Dropping replication slot: _tmp_
2023-02-14 18:48:40.551:INFO:elasticsearch: HEAD http://****************:9200/brokers [status:200 request:0.002s]
2023-02-14 18:48:40.553:INFO:pgsync.redisqueue: Deleting redis key: queue:broker_database_brokers
2023-02-14 18:48:40.557:DEBUG:pgsync.base: Dropping trigger on table: public.broker
2023-02-14 18:48:40.557:DEBUG:pgsync.base: Dropping trigger on table: public.country
2023-02-14 18:48:40.561:DEBUG:pgsync.base: Dropping view: public._view
2023-02-14 18:48:40.578:DEBUG:pgsync.base: Dropped view: public._view
2023-02-14 18:48:40.581:DEBUG:pgsync.base: Dropping replication slot: broker_database_brokers
 - public.broker
    - public.country
2023-02-14 18:48:51.345:INFO:pgsync.utils: Settings:
2023-02-14 18:48:51.345:INFO:pgsync.utils: Schema    : schemas/brokers.json
2023-02-14 18:48:51.345:INFO:pgsync.utils: -----------------------------------------------------------------
2023-02-14 18:48:51.345:INFO:pgsync.utils: Checkpoint:
2023-02-14 18:48:51.345:INFO:pgsync.utils: Path: /data
2023-02-14 18:48:51.345:INFO:pgsync.utils: Postgres:
2023-02-14 18:48:51.345:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:48:51.347:INFO:pgsync.utils: URL: postgresql+psycopg2://pgsync:****************@****************
2023-02-14 18:48:51.347:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:48:51.348:INFO:pgsync.utils: Elasticsearch:
2023-02-14 18:48:51.348:INFO:pgsync.utils: URL: http://pgsync:****************@****************
2023-02-14 18:48:51.348:INFO:pgsync.utils: Redis:
2023-02-14 18:48:51.348:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:48:51.349:DEBUG:pgsync.urls: Connecting to Redis without password.
2023-02-14 18:48:51.349:INFO:pgsync.utils: URL: redis://****************:6379/0
2023-02-14 18:48:51.349:INFO:pgsync.utils: -----------------------------------------------------------------
2023-02-14 18:48:51.350:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:48:51.356:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:48:51.368:INFO:elasticsearch: GET http://****************:9200/ [status:200 request:0.011s]
2023-02-14 18:48:51.370:INFO:elasticsearch: GET http://****************:9200/ [status:200 request:0.001s]
2023-02-14 18:48:51.371:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:48:51.371:DEBUG:pgsync.urls: Connecting to Redis without password.
2023-02-14 18:48:51.409:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:48:51.409:DEBUG:pgsync.plugin: Plugin class: plugins.securities.AddKindBrokerPlugin
2023-02-14 18:48:51.424:DEBUG:pgsync.base: Creating replication slot: _tmp_
2023-02-14 18:48:51.446:DEBUG:pgsync.base: Dropping replication slot: _tmp_
2023-02-14 18:48:51.554:INFO:elasticsearch: HEAD http://****************:9200/brokers [status:200 request:0.002s]
2023-02-14 18:48:51.554:WARNING:pgsync.sync: Checkpoint file not found: /data/.broker_database_brokers
2023-02-14 18:48:51.554:INFO:pgsync.redisqueue: Deleting redis key: queue:broker_database_brokers
2023-02-14 18:48:51.559:DEBUG:pgsync.base: Dropping trigger on table: public.broker
2023-02-14 18:48:51.559:DEBUG:pgsync.base: Dropping trigger on table: public.country
2023-02-14 18:48:51.560:DEBUG:pgsync.base: Dropping replication slot: broker_database_brokers
2023-02-14 18:48:52.806:DEBUG:pgsync.view: Creating view: public._view
2023-02-14 18:48:52.829:DEBUG:pgsync.view: Created view: public._view
2023-02-14 18:48:52.838:DEBUG:pgsync.base: Creating trigger on table: public.broker
2023-02-14 18:48:52.838:DEBUG:pgsync.base: Dropping trigger on table: public.broker
2023-02-14 18:48:52.842:DEBUG:pgsync.base: Dropping trigger on table: public.broker
2023-02-14 18:48:52.846:DEBUG:pgsync.base: Creating trigger on table: public.country
2023-02-14 18:48:52.846:DEBUG:pgsync.base: Dropping trigger on table: public.country
2023-02-14 18:48:52.850:DEBUG:pgsync.base: Dropping trigger on table: public.country
2023-02-14 18:48:52.857:DEBUG:pgsync.base: Creating replication slot: broker_database_brokers
2023-02-14 18:48:52.877:INFO:__main__: Bootstrap: broker_database
 - public.broker
    - public.country
2023-02-14 18:49:03.652:INFO:pgsync.utils: Settings:
2023-02-14 18:49:03.652:INFO:pgsync.utils: Schema    : schemas/brokers.json
2023-02-14 18:49:03.652:INFO:pgsync.utils: -----------------------------------------------------------------
2023-02-14 18:49:03.652:INFO:pgsync.utils: Checkpoint:
2023-02-14 18:49:03.652:INFO:pgsync.utils: Path: /data
2023-02-14 18:49:03.652:INFO:pgsync.utils: Postgres:
2023-02-14 18:49:03.652:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:49:03.654:INFO:pgsync.utils: URL: postgresql+psycopg2://pgsync:****************@****************
2023-02-14 18:49:03.655:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:49:03.655:INFO:pgsync.utils: Elasticsearch:
2023-02-14 18:49:03.655:INFO:pgsync.utils: URL: http://pgsync:****************@****************
2023-02-14 18:49:03.655:INFO:pgsync.utils: Redis:
2023-02-14 18:49:03.655:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:49:03.656:DEBUG:pgsync.urls: Connecting to Redis without password.
2023-02-14 18:49:03.656:INFO:pgsync.utils: URL: redis://****************:6379/0
2023-02-14 18:49:03.656:INFO:pgsync.utils: -----------------------------------------------------------------
2023-02-14 18:49:03.656:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:49:03.663:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:49:03.672:INFO:elasticsearch: GET http://****************:9200/ [status:200 request:0.008s]
2023-02-14 18:49:03.673:INFO:elasticsearch: GET http://****************:9200/ [status:200 request:0.001s]
2023-02-14 18:49:03.674:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:49:03.675:DEBUG:pgsync.urls: Connecting to Redis without password.
2023-02-14 18:49:03.710:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:49:03.710:DEBUG:pgsync.plugin: Plugin class: plugins.securities.AddKindBrokerPlugin
2023-02-14 18:49:03.716:DEBUG:pgsync.utils: pg_settings:
SELECT setting
FROM pg_settings
WHERE name = %(name_1)s
2023-02-14 18:49:03.725:DEBUG:pgsync.utils: pg_settings:
SELECT setting
FROM pg_settings
WHERE name = %(name_1)s
2023-02-14 18:49:03.735:DEBUG:pgsync.utils: replication_slots:
SELECT *
FROM PG_REPLICATION_SLOTS
WHERE slot_name = %(slot_name_1)s
  AND slot_type = %(slot_type_1)s
  AND plugin = %(plugin_1)s
2023-02-14 18:49:03.739:DEBUG:pgsync.base: Creating replication slot: _tmp_
2023-02-14 18:49:03.742:DEBUG:pgsync.utils: create_replication_slot:
SELECT *
FROM PG_CREATE_LOGICAL_REPLICATION_SLOT(%(PG_CREATE_LOGICAL_REPLICATION_SLOT_1)s, %(PG_CREATE_LOGICAL_REPLICATION_SLOT_2)s)
2023-02-14 18:49:03.764:DEBUG:pgsync.base: Dropping replication slot: _tmp_
2023-02-14 18:49:03.770:DEBUG:pgsync.utils: replication_slots:
SELECT *
FROM PG_REPLICATION_SLOTS
WHERE slot_name = %(slot_name_1)s
  AND slot_type = %(slot_type_1)s
  AND plugin = %(plugin_1)s
2023-02-14 18:49:03.778:DEBUG:pgsync.utils: drop_replication_slot:
SELECT *
FROM PG_DROP_REPLICATION_SLOT(%(PG_DROP_REPLICATION_SLOT_1)s)
2023-02-14 18:49:03.786:DEBUG:pgsync.utils: pg_settings:
SELECT setting
FROM pg_settings
WHERE name = %(name_1)s
2023-02-14 18:49:03.795:DEBUG:pgsync.utils: replication_slots:
SELECT *
FROM PG_REPLICATION_SLOTS
WHERE slot_name = %(slot_name_1)s
  AND slot_type = %(slot_type_1)s
  AND plugin = %(plugin_1)s
2023-02-14 18:49:03.913:INFO:elasticsearch: HEAD http://****************:9200/brokers [status:200 request:0.003s]
2023-02-14 18:49:03.916:DEBUG:pgsync.utils: txid_current:
SELECT *
FROM TXID_CURRENT()
2023-02-14 18:49:03.919:DEBUG:pgsync.sync: pull txmin: None - txmax: 3509820500
2023-02-14 18:49:03.974:DEBUG:pgsync.utils: Query:
SELECT JSON_BUILD_ARRAY(anon_1._keys) AS "JSON_BUILD_ARRAY_1",
       CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_2)s, broker_1.id, %(JSON_BUILD_OBJECT_3)s, broker_1.symbol, %(JSON_BUILD_OBJECT_4)s, broker_1.display_name, %(JSON_BUILD_OBJECT_5)s, anon_1.country) AS JSONB) AS "JSON_BUILD_OBJECT_1",
       broker_1.id
FROM public.broker AS broker_1
LEFT OUTER JOIN LATERAL
  (SELECT CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_6)s, CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_7)s, JSON_BUILD_ARRAY(country_1.id)) AS JSONB)) AS JSONB) AS _keys,
          CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_8)s, country_1.name, %(JSON_BUILD_OBJECT_9)s, country_1.symbol) AS JSONB) AS country,
          country_1.id AS id
   FROM public.country AS country_1
   WHERE country_1.id = broker_1.country_id) AS anon_1 ON anon_1.id = broker_1.country_id
WHERE CAST(CAST(broker_1.xmin AS TEXT) AS BIGINT) < %(param_1)s
pg_settings:
SELECT setting
FROM pg_settings
WHERE name = %(name_1)s
-------------------------------------------------------------------------------
pg_settings:
SELECT setting
FROM pg_settings
WHERE name = %(name_1)s
-------------------------------------------------------------------------------
replication_slots:
SELECT *
FROM PG_REPLICATION_SLOTS
----------
SELECT xid,
       DATA
FROM PG_LOGICAL_SLOT_PEEK_CHANGES(%(PG_LOGICAL_SLOT_PEEK_CHANGES_1)s, NULL, NULL)
WHERE CAST(CAST(xid AS TEXT) AS BIGINT) < %(param_1)s
LIMIT %(param_2)s
OFFSET %(param_3)s
-------------------------------------------------------------------------------
 0:00:00.724242 (0.72 sec)

But the daemon mode crashes without any error information:


2023-02-14 18:54:11.331:INFO:pgsync.utils: Settings:
2023-02-14 18:54:11.332:INFO:pgsync.utils: Schema    : schemas/brokers.json
2023-02-14 18:54:11.332:INFO:pgsync.utils: -----------------------------------------------------------------
2023-02-14 18:54:11.332:INFO:pgsync.utils: Checkpoint:
2023-02-14 18:54:11.332:INFO:pgsync.utils: Path: /data
2023-02-14 18:54:11.332:INFO:pgsync.utils: Postgres:
2023-02-14 18:54:11.332:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:54:11.335:INFO:pgsync.utils: URL: postgresql+psycopg2://pgsync:****************@****************
2023-02-14 18:54:11.335:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:54:11.335:INFO:pgsync.utils: Elasticsearch:
2023-02-14 18:54:11.335:INFO:pgsync.utils: URL: http://pgsync:****************@****************
2023-02-14 18:54:11.336:INFO:pgsync.utils: Redis:
2023-02-14 18:54:11.336:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:54:11.336:DEBUG:pgsync.urls: Connecting to Redis without password.
2023-02-14 18:54:11.336:INFO:pgsync.utils: URL: redis://****************:6379/0
2023-02-14 18:54:11.336:INFO:pgsync.utils: -----------------------------------------------------------------
2023-02-14 18:54:11.337:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:54:11.342:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:54:11.362:INFO:elasticsearch: GET http://****************:9200/ [status:200 request:0.019s]
2023-02-14 18:54:11.375:INFO:elasticsearch: GET http://****************:9200/ [status:200 request:0.013s]
2023-02-14 18:54:11.376:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:54:11.377:DEBUG:pgsync.urls: Connecting to Redis without password.
2023-02-14 18:54:11.414:DEBUG:pgsync.plugin: Reloading plugins from package: plugins
2023-02-14 18:54:11.414:DEBUG:pgsync.plugin: Plugin class: plugins.securities.AddKindBrokerPlugin
2023-02-14 18:54:11.419:DEBUG:pgsync.utils: pg_settings:
SELECT setting
FROM pg_settings
WHERE name = %(name_1)s
2023-02-14 18:54:11.426:DEBUG:pgsync.utils: pg_settings:
SELECT setting
FROM pg_settings
WHERE name = %(name_1)s
2023-02-14 18:53:55.396:DEBUG:pgsync.utils: replication_slots:
SELECT *
FROM PG_REPLICATION_SLOTS
WHERE slot_name = %(slot_name_1)s
  AND slot_type = %(slot_type_1)s
  AND plugin = %(plugin_1)s
2023-02-14 18:53:55.534:INFO:elasticsearch: HEAD http://****************:9200/brokers [status:200 request:0.004s]
2023-02-14 18:53:55.537:DEBUG:pgsync.utils: txid_current:
SELECT *
FROM TXID_CURRENT()
2023-02-14 18:53:55.541:DEBUG:pgsync.sync: pull txmin: 3509820822 - txmax: 3509820902
2023-02-14 18:53:55.624:DEBUG:pgsync.utils: Query:
SELECT JSON_BUILD_ARRAY(anon_1._keys) AS "JSON_BUILD_ARRAY_1",
       CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_2)s, broker_1.id, %(JSON_BUILD_OBJECT_3)s, broker_1.symbol, %(JSON_BUILD_OBJECT_4)s, broker_1.display_name, %(JSON_BUILD_OBJECT_5)s, anon_1.country) AS JSONB) AS "JSON_BUILD_OBJECT_1",
       broker_1.id
FROM public.broker AS broker_1
LEFT OUTER JOIN LATERAL
  (SELECT CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_6)s, CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_7)s, JSON_BUILD_ARRAY(country_1.id)) AS JSONB)) AS JSONB) AS _keys,
          CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_8)s, country_1.name, %(JSON_BUILD_OBJECT_9)s, country_1.symbol) AS JSONB) AS country,
          country_1.id AS id
   FROM public.country AS country_1
   WHERE country_1.id = broker_1.country_id) AS anon_1 ON anon_1.id = broker_1.country_id
WHERE CAST(CAST(broker_1.xmin AS TEXT) AS BIGINT) >= %(param_1)s
  AND CAST(CAST(broker_1.xmin AS TEXT) AS BIGINT) < %(param_2)s
pg_settings:
SELECT setting
FROM pg_settings
WHERE name = %(name_1)s
-------------------------------------------------------------------------------
pg_settings:
SELECT setting
FROM pg_settings
WHERE name = %(name_1)s
-------------------------------------------------------------------------------
replication_slots:
SELECT *
FROM PG_REPLICATION_SLOTS
WHERE slot_name = %(slot_name_1)s
  AND slot_type = %(slot_type_1)s
  AND plugin = %(plugin_1)s
-------------------------------------------------------------------------------
create_replication_slot:
SELECT *
FROM PG_CREATE_LOGICAL_REPLICATION_SLOT(%(PG_CREATE_LOGICAL_REPLICATION_SLOT_1)s, %(PG_CREATE_LOGICAL_REPLICATION_SLOT_2)s)
-------------------------------------------------------------------------------
replication_slots:
SELECT *
FROM PG_REPLICATION_SLOTS
WHERE slot_name = %(slot_name_1)s
  AND slot_type = %(slot_type_1)s
  AND plugin = %(plugin_1)s
-------------------------------------------------------------------------------
drop_replication_slot:
SELECT *
FROM PG_DROP_REPLICATION_SLOT(%(PG_DROP_REPLICATION_SLOT_1)s)
-------------------------------------------------------------------------------
pg_settings:
SELECT setting
FROM pg_settings
WHERE name = %(name_1)s
-------------------------------------------------------------------------------
replication_slots:
SELECT *
FROM PG_REPLICATION_SLOTS
WHERE slot_name = %(slot_name_1)s
  AND slot_type = %(slot_type_1)s
  AND plugin = %(plugin_1)s
-------------------------------------------------------------------------------
 - public.broker
    - public.country
txid_current:
SELECT *
FROM TXID_CURRENT()
-------------------------------------------------------------------------------
Query:
SELECT JSON_BUILD_ARRAY(anon_1._keys) AS "JSON_BUILD_ARRAY_1",
       CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_2)s, broker_1.id, %(JSON_BUILD_OBJECT_3)s, broker_1.symbol, %(JSON_BUILD_OBJECT_4)s, broker_1.display_name, %(JSON_BUILD_OBJECT_5)s, anon_1.country) AS JSONB) AS "JSON_BUILD_OBJECT_1",
       broker_1.id
FROM public.broker AS broker_1
LEFT OUTER JOIN LATERAL
  (SELECT CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_6)s, CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_7)s, JSON_BUILD_ARRAY(country_1.id)) AS JSONB)) AS JSONB) AS _keys,
          CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_8)s, country_1.name, %(JSON_BUILD_OBJECT_9)s, country_1.symbol) AS JSONB) AS country,
          country_1.id AS id
   FROM public.country AS country_1
   WHERE country_1.id = broker_1.country_id) AS anon_1 ON anon_1.id = broker_1.country_id
WHERE CAST(CAST(broker_1.xmin AS TEXT) AS BIGINT) >= %(param_1)s
  AND CAST(CAST(broker_1.xmin AS TEXT) AS BIGINT) < %(param_2)s
-------------------------------------------------------------------------------

2023-02-14 18:53:55.697:DEBUG:pgsync.utils: SELECT xid,
       DATA
FROM PG_LOGICAL_SLOT_PEEK_CHANGES(%(PG_LOGICAL_SLOT_PEEK_CHANGES_1)s, NULL, NULL)
WHERE CAST(CAST(xid AS TEXT) AS BIGINT) >= %(param_1)s
  AND CAST(CAST(xid AS TEXT) AS BIGINT) < %(param_2)s
LIMIT %(param_3)s
OFFSET %(param_4)s
2023-02-14 18:53:55.711:DEBUG:pgsync.utils: txid_current:
SELECT *
FROM TXID_CURRENT()

The replication slot exists, Postgres settings are set to the required settings. Is there anything I'm missing out? I'm using the same process for other tables without any errors

Error Message (if any): no error messages

toluaina commented 1 year ago