toluaina / pgsync

Postgres to Elasticsearch/OpenSearch sync
https://pgsync.com
MIT License
1.18k stars 182 forks source link

Redis bulk issue #2 #568

Open gitbute opened 2 months ago

gitbute commented 2 months ago

PGSync version: 3.2.0

Postgres version: 16.4.0

Elasticsearch/OpenSearch version: 7.16.3

Redis version: 7.4.0

Python version: 3.9

Problem Description:

Maybe related to #481, #552, #569

First, the schema:

[
  {
    "database": "db",
    "index": "document",
    "nodes": {
      "table": "document",
      "columns": [
        "title"
      ],
      "children": [
        {
          "table": "content",
          "columns": ["content"],
          "relationship": {
            "variant": "object",
            "type": "one_to_one",
            "foreign_key": {
              "child": ["document_id"],
              "parent": ["uuid"]
            }
          }
        }
      ]
    }
  }
]

We are experiencing some strange issue when we bulk insert data into a table in a parent-child relation (one-to-one). Lets assume in the parent table document there are 20'000 documents, already indexed in elasticsearch via pgsync. In the empty table "content" we insert 20'000 rows (committing between each insert), but only 20 rows ("content"-field) get synced to elasticsearch. When we set REDIS_READ_CHUNK_SIZE: "1", it works correctly (all 20'000 content-inserts are reflected in elasticsearch), but it takes about 10 minutes. If i set REDIS_READ_CHUNK_SIZE: "10", i get around 2000 correct syncs to elasticsearch. So it seems that somehow it drops most of the redis read chunk on each sync. We tried every combination of possible solutions via env-vars (elastic bulk streaming, different chunk sizes on elastic or redis and so on). As i can see it pgsync is only processing one record from each redis read chunk.

Heres the log with the default REDIS_READ_CHUNK_SIZE (you can clearly see how it only reflects a few update to elastic each chunk. In the end, i have only 20 documents with content correctly synced):

Sync db:document Xlog: [200] => Db: [0] => Redis: [0] => Elasticsearch: [0]...
Sync db:document Xlog: [200] => Db: [95] => Redis: [0] => Elasticsearch: [0]...
Sync db:document Xlog: [200] => Db: [356] => Redis: [0] => Elasticsearch: [0]...
Sync db:document Xlog: [200] => Db: [569] => Redis: [0] => Elasticsearch: [0]...
Sync db:document Xlog: [200] => Db: [976] => Redis: [0] => Elasticsearch: [0]...
Sync db:document Xlog: [200] => Db: [1,346] => Redis: [0] => Elasticsearch: [0]...
Sync db:document Xlog: [200] => Db: [1,787] => Redis: [500] => Elasticsearch: [0]...
Sync db:document Xlog: [200] => Db: [1,929] => Redis: [500] => Elasticsearch: [0]...
Sync db:document Xlog: [200] => Db: [1,984] => Redis: [500] => Elasticsearch: [0]...
Sync db:document Xlog: [200] => Db: [2,313] => Redis: [1,000] => Elasticsearch: [0]...
Sync db:document Xlog: [200] => Db: [2,642] => Redis: [1,500] => Elasticsearch: [0]...
Sync db:document Xlog: [200] => Db: [2,883] => Redis: [500] => Elasticsearch: [1]...
Sync db:document Xlog: [200] => Db: [3,244] => Redis: [500] => Elasticsearch: [2]...
Sync db:document Xlog: [200] => Db: [3,588] => Redis: [1,000] => Elasticsearch: [2]...
Sync db:document Xlog: [200] => Db: [3,895] => Redis: [1,000] => Elasticsearch: [2]...
Sync db:document Xlog: [200] => Db: [4,196] => Redis: [1,500] => Elasticsearch: [2]...
Sync db:document Xlog: [200] => Db: [4,336] => Redis: [1,500] => Elasticsearch: [2]...
Sync db:document Xlog: [200] => Db: [4,523] => Redis: [2,000] => Elasticsearch: [2]...
Sync db:document Xlog: [200] => Db: [4,911] => Redis: [2,000] => Elasticsearch: [2]...
Sync db:document Xlog: [200] => Db: [5,286] => Redis: [1,500] => Elasticsearch: [3]...
Sync db:document Xlog: [200] => Db: [5,393] => Redis: [1,500] => Elasticsearch: [3]...
...

Heres the same thing with REDIS_READ_CHUNK_SIZE set to 1 (you can see how everything gets synced correctly eventually. In the end, i have every of the 20'000 documents content synced):

Sync db:document Xlog: [55] => Db: [0] => Redis: [0] => Elasticsearch: [0]...
Sync db:document Xlog: [55] => Db: [17] => Redis: [0] => Elasticsearch: [0]...
Sync db:document Xlog: [55] => Db: [197] => Redis: [0] => Elasticsearch: [0]...
Sync db:document Xlog: [55] => Db: [469] => Redis: [0] => Elasticsearch: [0]...
Sync db:document Xlog: [55] => Db: [723] => Redis: [496] => Elasticsearch: [2]...
Sync db:document Xlog: [55] => Db: [987] => Redis: [492] => Elasticsearch: [6]...
Sync db:document Xlog: [55] => Db: [1,278] => Redis: [991] => Elasticsearch: [7]...
Sync db:document Xlog: [55] => Db: [1,547] => Redis: [1,491] => Elasticsearch: [7]...
Sync db:document Xlog: [55] => Db: [1,745] => Redis: [1,490] => Elasticsearch: [8]...
Sync db:document Xlog: [55] => Db: [1,898] => Redis: [1,487] => Elasticsearch: [11]...
Sync db:document Xlog: [55] => Db: [1,932] => Redis: [1,486] => Elasticsearch: [12]...
Sync db:document Xlog: [55] => Db: [1,967] => Redis: [1,480] => Elasticsearch: [19]...
Sync db:document Xlog: [55] => Db: [2,105] => Redis: [1,973] => Elasticsearch: [27]...
Sync db:document Xlog: [55] => Db: [2,369] => Redis: [1,967] => Elasticsearch: [31]...
Sync db:document Xlog: [55] => Db: [2,607] => Redis: [2,461] => Elasticsearch: [37]...
Sync db:document Xlog: [55] => Db: [2,677] => Redis: [2,453] => Elasticsearch: [45]...
Sync db:document Xlog: [55] => Db: [3,094] => Redis: [2,944] => Elasticsearch: [54]...
Sync db:document Xlog: [55] => Db: [3,263] => Redis: [2,939] => Elasticsearch: [60]...
Sync db:document Xlog: [55] => Db: [3,482] => Redis: [2,932] => Elasticsearch: [66]...
Sync db:document Xlog: [55] => Db: [3,697] => Redis: [3,426] => Elasticsearch: [73]...
Sync db:document Xlog: [55] => Db: [4,000] => Redis: [3,916] => Elasticsearch: [82]...
Sync db:document Xlog: [55] => Db: [4,315] => Redis: [3,906] => Elasticsearch: [92]...
Sync db:document Xlog: [55] => Db: [4,372] => Redis: [3,901] => Elasticsearch: [97]...
Sync db:document Xlog: [55] => Db: [4,676] => Redis: [4,395] => Elasticsearch: [105]...
Sync db:document Xlog: [55] => Db: [5,040] => Redis: [4,886] => Elasticsearch: [113]...
Sync db:document Xlog: [55] => Db: [5,303] => Redis: [4,880] => Elasticsearch: [118]...
...

Edit: I tested this behaviour all the way down to two consecutive inserts (instead of 20'000) at a time, to the content table. Only one of them gets processed. So it seems all except one record of the redis read chunk gets discarded in case of an update / insert in a child relationship. I also tested inserts / updates to the parent "document" table, in this case, everything works as expected.

Here is a debug log where only one of the two inserts gets synced:

Sync db:document Xlog: [4] => Db: [2] => Redis: [0] => Elasticsearch: [24,061]...
DEBUG:pgsync.sync: poll_db: {'xmin': 802974, 'new': {'id': 'f1f8a412-0112-4054-ab7d-4cec287879fa', 'document_id': 'b88f256f-8dfe-45f7-ba8a-42d77b6c90b7'}, 'old': None, 'indices': ['document'], 'tg_op': 'INSERT', 'table': 'content', 'schema': 'public'}
DEBUG:pgsync.sync: poll_db: {'xmin': 802975, 'new': {'id': '5c5eb938-443e-457b-a561-59420c1e38d7', 'document_id': '77779bbf-e3df-4c5d-8a11-d6de0293a36d'}, 'old': None, 'indices': ['document'], 'tg_op': 'INSERT', 'table': 'content', 'schema': 'public'}
DEBUG:pgsync.redisqueue: pop size: 2
DEBUG:pgsync.sync: _poll_redis: [{'xmin': 802974, 'new': {'id': 'f1f8a412-0112-4054-ab7d-4cec287879fa', 'document_id': 'b88f256f-8dfe-45f7-ba8a-42d77b6c90b7'}, 'old': None, 'indices': ['document'], 'tg_op': 'INSERT', 'table': 'content', 'schema': 'public'}, {'xmin': 802975, 'new': {'id': '5c5eb938-443e-457b-a561-59420c1e38d7', 'document_id': '77779bbf-e3df-4c5d-8a11-d6de0293a36d'}, 'old': None, 'indices': ['document'], 'tg_op': 'INSERT', 'table': 'content', 'schema': 'public'}]
DEBUG:pgsync.sync: on_publish len 2
DEBUG:pgsync.sync: tg_op: INSERT table: public.content
INFO:elastic_transport.transport: POST http://local-es-http.elastic-cluster.svc.cluster.local:9200/document/_search?scroll=5m [status:200 duration:0.003s]
INFO:elastic_transport.transport: DELETE http://local-es-http.elastic-cluster.svc.cluster.local:9200/_search/scroll [status:200 duration:0.001s]
INFO:elastic_transport.transport: POST http://local-es-http.elastic-cluster.svc.cluster.local:9200/document/_search?scroll=5m [status:200 duration:0.003s]
INFO:elastic_transport.transport: DELETE http://local-es-http.elastic-cluster.svc.cluster.local:9200/_search/scroll [status:200 duration:0.001s]
INFO:elastic_transport.transport: PUT http://local-es-http.elastic-cluster.svc.cluster.local:9200/_bulk?refresh=false [status:200 duration:0.033s]
Sync db:document Xlog: [4] => Db: [4] => Redis: [0] => Elasticsearch: [24,062]...

Error Message (if any):

toluaina commented 2 months ago

From the logs included, it seems things worked as expected up till DEBUG:pgsync.sync: tg_op: INSERT table: public.content My guess (until I see your schema) is that your relationship with manual foreign_key config might not be setup correctly.

gitbute commented 2 months ago

I tried with a completely fresh database, no manual foreign key setup (as its alrady setup by the DB schema and pgsync seems to detect it correctly) Sadly, i have exactly the same behavior. Whats also to mention: It only behaves wrong on INSERTS. UPDATES / DELETES get handled correctly