toluaina / pgsync

Postgres to Elasticsearch/OpenSearch sync
https://pgsync.com
MIT License
1.19k stars 182 forks source link

Issue with Redis pipelining? #235

Open jpughcs opened 2 years ago

jpughcs commented 2 years ago

PGSync version: 2.1.10

Postgres version: 13.4

Elasticsearch version: 7.15.0

Redis version: 6.2

Python version: 3.7

Problem Description:

Using multiple schemas in single config. Initial load works great. Normal sync runs fine for a while until a large influx of records come through and pgsync enters a broken state. Once in this state, the queue stops being processed and the Redis pending continuously grows. The bulk pop pipeline does not pull any records from the queue (nor does it trim). However, executing a lrange manually will return results.

Error Message (if any):

2022-01-25 22:26:16.535:INFO:elasticsearch: POST https://es:9200/something-1/_search?scroll=5m&size=1000 [status:200 request:0.142s]
2022-01-25 22:26:16.576:DEBUG:pgsync.redisqueue: bulk_pop nsize: 0
2022-01-25 22:26:16.617:INFO:elasticsearch: DELETE https://es:9200/_search/scroll [status:200 request:0.142s]
2022-01-25 22:26:16.619:DEBUG:pgsync.redisqueue: bulk_pop nsize: 0
2022-01-25 22:26:16.677:INFO:elasticsearch: DELETE https://es:9200/_search/scroll [status:200 request:0.141s]
2022-01-25 22:26:16.679:DEBUG:pgsync.redisqueue: bulk_pop nsize: 0
2022-01-25 22:26:16.720:DEBUG:pgsync.redisqueue: bulk_pop nsize: 0
2022-01-25 22:26:16.762:INFO:elasticsearch: POST https://es:9200/something-2/_search?scroll=5m&size=1000 [status:200 request:0.143s]
2022-01-25 22:26:16.780:DEBUG:pgsync.redisqueue: bulk_pop nsize: 0
Syncing something Xlog: [91] => Db: [12,489] => Redis: [total = 1,131 pending = 11,358] => Elastic: [6,570] ...
Syncing something Xlog: [4] => Db: [12,489] => Redis: [total = 12,489 pending = 0] => Elastic: [56] ...
2022-01-25 22:26:16.820:INFO:elasticsearch: POST https://es:9200/something-1/_search?scroll=5m&size=1000 [status:200 request:0.143s]
Syncing something Xlog: [37] => Db: [12,489] => Redis: [total = 10,578 pending = 1,911] => Elastic: [136] ...
2022-01-25 22:26:16.822:DEBUG:pgsync.redisqueue: bulk_pop nsize: 0

Strange: bulk_pop nsize: 0. Could be a red herring... Redis poller thread handling the index in question could have died?

Querying queue directly on Redis server:

127.0.0.1:6379> llen queue:something2
(integer) 11358
jpughcs commented 2 years ago

Seems to work fine with a single schema.

jpughcs commented 2 years ago

Scratch that, problem remains even with single schema

Syncing db Xlog: [22] => Db: [3,142] => Redis: [total = 1,001 pending = 2,141] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [3,142] => Redis: [total = 1,001 pending = 2,141] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [3,142] => Redis: [total = 1,001 pending = 2,141] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [3,177] => Redis: [total = 1,001 pending = 2,177] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [3,250] => Redis: [total = 1,001 pending = 2,250] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [3,311] => Redis: [total = 1,001 pending = 2,311] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [3,392] => Redis: [total = 1,001 pending = 2,393] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [3,741] => Redis: [total = 1,001 pending = 2,741] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [4,573] => Redis: [total = 1,001 pending = 3,573] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [5,139] => Redis: [total = 1,001 pending = 4,138] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [5,605] => Redis: [total = 1,001 pending = 4,604] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [6,178] => Redis: [total = 1,001 pending = 5,178] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [6,674] => Redis: [total = 1,001 pending = 5,674] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [7,612] => Redis: [total = 1,001 pending = 6,611] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [7,612] => Redis: [total = 1,001 pending = 6,611] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [7,612] => Redis: [total = 1,001 pending = 6,611] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [7,612] => Redis: [total = 1,001 pending = 6,611] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [7,612] => Redis: [total = 1,001 pending = 6,611] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [7,612] => Redis: [total = 1,001 pending = 6,611] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [7,612] => Redis: [total = 1,001 pending = 6,611] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [7,612] => Redis: [total = 1,001 pending = 6,611] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [7,612] => Redis: [total = 1,001 pending = 6,611] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [7,631] => Redis: [total = 1,001 pending = 6,631] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [8,366] => Redis: [total = 1,001 pending = 7,366] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [9,037] => Redis: [total = 1,001 pending = 8,036] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [9,734] => Redis: [total = 1,001 pending = 8,733] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [10,185] => Redis: [total = 1,001 pending = 9,184] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [10,585] => Redis: [total = 1,001 pending = 9,584] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [10,962] => Redis: [total = 1,001 pending = 9,961] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [11,239] => Redis: [total = 1,001 pending = 10,239] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [11,295] => Redis: [total = 1,001 pending = 10,295] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [11,330] => Redis: [total = 1,001 pending = 10,329] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [11,330] => Redis: [total = 1,001 pending = 10,329] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [11,330] => Redis: [total = 1,001 pending = 10,329] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [11,448] => Redis: [total = 1,001 pending = 10,447] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [11,807] => Redis: [total = 1,001 pending = 10,807] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [12,171] => Redis: [total = 1,001 pending = 11,170] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [12,262] => Redis: [total = 1,001 pending = 11,262] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [12,907] => Redis: [total = 1,001 pending = 11,906] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [13,268] => Redis: [total = 1,001 pending = 12,267] => Elastic: [26,680] ...
Syncing db Xlog: [22] => Db: [13,633] => Redis: [total = 1,001 pending = 12,632] => Elastic: [26,680] ...
2022-01-26 19:14:02.229:INFO:elasticsearch: POST https://es:9200/db-idx/_bulk?refresh=false [status:200 request:0.957s]
Syncing db Xlog: [22] => Db: [13,927] => Redis: [total = 1,001 pending = 12,926] => Elastic: [27,680] ...
Syncing db Xlog: [22] => Db: [13,927] => Redis: [total = 1,001 pending = 12,926] => Elastic: [27,680] ...
2022-01-26 19:14:04.716:INFO:elasticsearch: POST https://es:9200/db-idx/_bulk?refresh=false [status:200 request:0.944s]
Syncing db Xlog: [22] => Db: [13,927] => Redis: [total = 1,001 pending = 12,926] => Elastic: [28,680] ...
Syncing db Xlog: [22] => Db: [13,927] => Redis: [total = 1,001 pending = 12,926] => Elastic: [28,680] ...
Syncing db Xlog: [22] => Db: [13,927] => Redis: [total = 1,001 pending = 12,926] => Elastic: [28,680] ...
2022-01-26 19:14:07.480:INFO:elasticsearch: POST https://es:9200/db-idx/_bulk?refresh=false [status:200 request:0.954s]
Syncing db Xlog: [22] => Db: [13,927] => Redis: [total = 1,001 pending = 12,926] => Elastic: [29,680] ...
Syncing db Xlog: [22] => Db: [13,927] => Redis: [total = 1,001 pending = 12,926] => Elastic: [29,680] ...
2022-01-26 19:14:09.802:INFO:elasticsearch: POST https://es:9200/db-idx/_bulk?refresh=false [status:200 request:1.008s]
Syncing db Xlog: [22] => Db: [13,927] => Redis: [total = 1,001 pending = 12,926] => Elastic: [30,680] ...
Syncing db Xlog: [22] => Db: [13,927] => Redis: [total = 1,001 pending = 12,926] => Elastic: [30,680] ...
Syncing db Xlog: [22] => Db: [13,927] => Redis: [total = 1,001 pending = 12,926] => Elastic: [30,680] ...
2022-01-26 19:14:12.506:INFO:elasticsearch: POST https://es:9200/db-idx/_bulk?refresh=false [status:200 request:0.966s]
Syncing db Xlog: [22] => Db: [13,927] => Redis: [total = 1,001 pending = 12,926] => Elastic: [31,680] ...
Syncing db Xlog: [22] => Db: [13,927] => Redis: [total = 1,001 pending = 12,926] => Elastic: [31,680] ...
2022-01-26 19:14:14.868:INFO:elasticsearch: POST https://es:9200/db-idx/_bulk?refresh=false [status:200 request:0.975s]
Syncing db Xlog: [22] => Db: [13,927] => Redis: [total = 1,001 pending = 12,926] => Elastic: [32,680] ...
Syncing db Xlog: [22] => Db: [13,927] => Redis: [total = 1,001 pending = 12,926] => Elastic: [32,680] ...
Syncing db Xlog: [22] => Db: [14,006] => Redis: [total = 1,001 pending = 13,005] => Elastic: [32,680] ...
2022-01-26 19:14:17.468:INFO:elasticsearch: POST https://es:9200/db-idx/_bulk?refresh=false [status:200 request:1.125s]
Syncing db Xlog: [22] => Db: [14,139] => Redis: [total = 1,001 pending = 13,139] => Elastic: [33,680] ...
Syncing db Xlog: [22] => Db: [14,676] => Redis: [total = 1,001 pending = 13,675] => Elastic: [33,680] ...
Syncing db Xlog: [22] => Db: [15,004] => Redis: [total = 1,001 pending = 14,004] => Elastic: [33,680] ...
Syncing db Xlog: [22] => Db: [15,413] => Redis: [total = 1,001 pending = 14,412] => Elastic: [33,680] ...
2022-01-26 19:14:21.607:INFO:elasticsearch: POST https://es:9200/db-idx/_bulk?refresh=false [status:200 request:1.373s]
Syncing db Xlog: [22] => Db: [15,697] => Redis: [total = 1,001 pending = 14,697] => Elastic: [34,680] ...
Syncing db Xlog: [22] => Db: [16,005] => Redis: [total = 1,001 pending = 15,005] => Elastic: [34,680] ...
Syncing db Xlog: [22] => Db: [16,484] => Redis: [total = 1,001 pending = 15,483] => Elastic: [34,680] ...
2022-01-26 19:14:24.462:INFO:elasticsearch: POST https://es:9200/db-idx/_bulk?refresh=false [status:200 request:1.183s]
Syncing db Xlog: [22] => Db: [16,484] => Redis: [total = 1,001 pending = 15,483] => Elastic: [35,680] ...
Syncing db Xlog: [22] => Db: [16,484] => Redis: [total = 1,001 pending = 15,483] => Elastic: [35,680] ...
2022-01-26 19:14:26.794:INFO:elasticsearch: POST https://es:9200/db-idx/_bulk?refresh=false [status:200 request:1.085s]
Syncing db Xlog: [22] => Db: [16,484] => Redis: [total = 1,001 pending = 15,483] => Elastic: [36,680] ...
Syncing db Xlog: [22] => Db: [16,484] => Redis: [total = 1,001 pending = 15,483] => Elastic: [36,680] ...
Syncing db Xlog: [22] => Db: [16,548] => Redis: [total = 1,001 pending = 15,547] => Elastic: [36,680] ...
2022-01-26 19:14:29.603:INFO:elasticsearch: POST https://es:9200/db-idx/_bulk?refresh=false [status:200 request:1.005s]
Syncing db Xlog: [22] => Db: [16,826] => Redis: [total = 1,001 pending = 15,826] => Elastic: [37,680] ...
Syncing db Xlog: [22] => Db: [17,201] => Redis: [total = 1,001 pending = 16,200] => Elastic: [37,680] ...
Syncing db Xlog: [22] => Db: [17,529] => Redis: [total = 1,001 pending = 16,528] => Elastic: [37,680] ...
2022-01-26 19:14:33.289:INFO:elasticsearch: POST https://es:9200/db-idx/_bulk?refresh=false [status:200 request:1.013s]
Syncing db Xlog: [22] => Db: [17,796] => Redis: [total = 1,001 pending = 16,796] => Elastic: [38,680] ...
Syncing db Xlog: [22] => Db: [18,153] => Redis: [total = 1,001 pending = 17,153] => Elastic: [38,680] ...
Syncing db Xlog: [22] => Db: [18,482] => Redis: [total = 1,001 pending = 17,483] => Elastic: [38,680] ...
Syncing db Xlog: [22] => Db: [18,689] => Redis: [total = 1,001 pending = 17,689] => Elastic: [38,680] ...
Syncing db Xlog: [22] => Db: [18,975] => Redis: [total = 1,001 pending = 17,974] => Elastic: [38,680] ...
2022-01-26 19:14:38.202:INFO:elasticsearch: POST https://es:9200/db-idx/_bulk?refresh=false [status:200 request:0.991s]
Syncing db Xlog: [22] => Db: [19,378] => Redis: [total = 1,001 pending = 18,379] => Elastic: [39,680] ...
Syncing db Xlog: [22] => Db: [19,970] => Redis: [total = 1,001 pending = 18,970] => Elastic: [39,680] ...
Syncing db Xlog: [22] => Db: [20,731] => Redis: [total = 1,001 pending = 19,730] => Elastic: [39,680] ...
Syncing db Xlog: [22] => Db: [21,255] => Redis: [total = 1,001 pending = 20,255] => Elastic: [39,680] ...
2022-01-26 19:14:41.999:INFO:elasticsearch: POST https://es:9200/db-idx/_bulk?refresh=false [status:200 request:0.979s]
Syncing db Xlog: [22] => Db: [21,647] => Redis: [total = 1,001 pending = 20,647] => Elastic: [40,680] ...
Syncing db Xlog: [22] => Db: [22,286] => Redis: [total = 1,001 pending = 21,286] => Elastic: [40,680] ...
Syncing db Xlog: [22] => Db: [23,198] => Redis: [total = 1,001 pending = 22,198] => Elastic: [40,680] ...
Syncing db Xlog: [22] => Db: [24,012] => Redis: [total = 1,001 pending = 23,012] => Elastic: [40,680] ...
2022-01-26 19:14:46.546:INFO:elasticsearch: POST https://es:9200/db-idx/_bulk?refresh=false [status:200 request:1.035s]
Syncing db Xlog: [22] => Db: [24,493] => Redis: [total = 1,001 pending = 23,493] => Elastic: [41,680] ...
Syncing db Xlog: [22] => Db: [25,334] => Redis: [total = 1,001 pending = 24,334] => Elastic: [41,680] ...
toluaina commented 2 years ago

Is this issue easily reproducible from your end? Perhaps we can introduce some logging to narrow down the cause.

jpughcs commented 2 years ago

Yes, this happens always. Let me know what you'd like to patch in for logging.

toluaina commented 2 years ago

Can you add some logging into this method such as:

def bulk_pop(self, chunk_size: Optional[int] = None) -> List[dict]:
    """Remove and return multiple items from the queue."""
    chunk_size: int = chunk_size or REDIS_CHUNK_SIZE
    pipeline = self.__db.pipeline()
    print(f'pipe {chunk_size}')
    pipeline.lrange(self.key, 0, chunk_size - 1)
    print(f'range: {chunk_size - 1}')
    pipeline.ltrim(self.key, chunk_size, -1)
    print(f'ltrim: {chunk_size}')
    items: List[List[bytes], bool] = pipeline.execute()
    logger.debug(f"bulk_pop nsize: {len(items[0])}")
    print(f"bulk_pop nsize: {len(items[0])}"}
    return list(map(lambda x: json.loads(x), items[0]))

and send the output ?

jpughcs commented 2 years ago

Absolutely.

I will get this for you ASAP, likely tomorrow morning (evening for you).

Thank you!

jpughcs commented 2 years ago

Attached is a full run.

You can see there's one tick with bulk_pop nsize: 1000 and then bulk_pop never seems to be called again.

log.txt

jpughcs commented 2 years ago

Scratch that, bulk_pop is being called again.... but almost exactly at 15 minutes intervals...

jpughcs commented 2 years ago

Here is my Env config

ELASTICSEARCH_TIMEOUT=60
ELASTICSEARCH_TIMEOUT=600
ELASTICSEARCH_QUEUE_SIZE=4
ELASTICSEARCH_THREAD_COUNT=4
QUERY_CHUNK_SIZE=5000
ELASTICSEARCH_CHUNK_SIZE=1000
ELASTICSEARCH_STREAMING_BULK=False
REDIS_POLL_INTERVAL=0.01
jpughcs commented 2 years ago

Just to note, I've experimented a bit with the configuration without luck. Even with REDIS_CHUNK_SIZE=10000 it seems that the bulk_pop, processing of the records, and insertion of the documents in to ES happen near instantaneous. I am unable to figure out what is happening between ticks.

toluaina commented 2 years ago

are you running this in docker? are we sure there is no env var defined somehwere in the config? The fact that this is being called at intervals of 15 mins leads me to suspect REDIS_POLL_INTERVAL is in use. Can you experiment with changing REDIS_POLL_INTERVAL to a low value like 10 secs and see the impact.

Perhaps even printing out the value of REDIS_POLL_INTERVAL on statrup

jpughcs commented 2 years ago

This is being run in docker. The interval appears to change proportionally with REDIS_CHUNK_SIZE

I will experiment changing REDIS_POLL_INTERVAL

jpughcs commented 2 years ago

Changing REDIS_POLL_INTERVAL does not seem to make a difference. I am seeing a large amount of these POST and DELETE requests (seen below) that happen in between bulk_pop. I am not familiar with the ES library, but from a quick glance, these appear to be internal functions of the ES library you're using. I've tried both ELASTICSEARCH_STREAMING_BULK True and False and many different values for ELASTICSEARCH_CHUNK_SIZE (between 1 and 100000) without any luck. All of the said POST and DELETE requests complete in a fraction of a second and it appears there is no other activity between bulk_pop calls.

2022-02-01 19:24:26.965:INFO:elasticsearch: POST https://es:9200/idx/_search?scroll=5m&size=1000 [status:200 request:0.003s]
2022-02-01 19:24:26.967:INFO:elasticsearch: POST https://es:9200/_search/scroll [status:200 request:0.002s]
2022-02-01 19:24:26.968:INFO:elasticsearch: DELETE https://es:9200/_search/scroll [status:200 request:0.001s]
2022-02-01 19:24:26.971:INFO:elasticsearch: POST https://es:9200/idx/_search?scroll=5m&size=1000 [status:200 request:0.002s]
2022-02-01 19:24:26.973:INFO:elasticsearch: DELETE https://es:9200/_search/scroll [status:200 request:0.001s]
2022-02-01 19:24:26.976:INFO:elasticsearch: POST https://es:9200/idx/_search?scroll=5m&size=1000 [status:200 request:0.003s]
2022-02-01 19:24:26.978:INFO:elasticsearch: POST https://es:9200/_search/scroll [status:200 request:0.002s]
2022-02-01 19:24:26.979:INFO:elasticsearch: DELETE https://es:9200/_search/scroll [status:200 request:0.001s]
2022-02-01 19:24:26.982:INFO:elasticsearch: POST https://es:9200/idx/_search?scroll=5m&size=1000 [status:200 request:0.002s]
2022-02-01 19:24:26.983:INFO:elasticsearch: DELETE https://es:9200/_search/scroll [status:200 request:0.001s]
2022-02-01 19:24:26.987:INFO:elasticsearch: POST https://es:9200/idx/_search?scroll=5m&size=1000 [status:200 request:0.003s]
2022-02-01 19:24:26.990:INFO:elasticsearch: POST https://es:9200/_search/scroll [status:200 request:0.003s]
2022-02-01 19:24:26.991:INFO:elasticsearch: DELETE https://es:9200/_search/scroll [status:200 request:0.001s]
2022-02-01 19:24:26.996:INFO:elasticsearch: POST https://es:9200/idx/_search?scroll=5m&size=1000 [status:200 request:0.004s]
2022-02-01 19:24:26.998:INFO:elasticsearch: DELETE https://es:9200/_search/scroll [status:200 request:0.002s]
2022-02-01 19:24:27.001:INFO:elasticsearch: POST https://es:9200/idx/_search?scroll=5m&size=1000 [status:200 request:0.003s]
2022-02-01 19:24:27.005:INFO:elasticsearch: POST https://es:9200/_search/scroll [status:200 request:0.003s]
2022-02-01 19:24:27.007:INFO:elasticsearch: DELETE https://es:9200/_search/scroll [status:200 request:0.002s]
2022-02-01 19:24:27.013:INFO:elasticsearch: POST https://es:9200/idx/_search?scroll=5m&size=1000 [status:200 request:0.003s]
jpughcs commented 2 years ago

@toluaina https://github.com/toluaina/pgsync/blob/master/pgsync/elastichelper.py#L215

It seems that if records/events come in too fast from PG, these search scans back up and hold up the queues. They will eventually process, but it may take 10 minutes or longer for each full scan to complete before the next tick to process the queue.

Is the scan actually blocking the redis poll?

Can this process be entirely disabled if running in standalone mode? If so, I can let the process gracefully exit and just run it on a cron in the meantime.

Are there any optimizations that can be made to reduce the amount of data scanned?

Can we make the scan page size configurable?

Can we add extra concurrency at any of these blocking processes?

Moulick commented 2 years ago

Sorta the same problem on my side as well. During the initial data loading/snapshot, with debug log enabled, There are only logs for elastic search POST API calls, nothing about Redis. Then after the document count reaches the expected count (~236million), then there is just continuous spam of 2022-04-26 05:40:24.669:DEBUG:pgsync.redisqueue: bulk_pop nsize: 0, Like 100's of times per second.

Moulick commented 2 years ago

@jpughcs @toluaina could the issue be due to the Redis version? you are using 6.2 and I am trying with 5.0.5, while the documentation says 3.1.0.

Moulick commented 2 years ago

what I noticed now is that my Redis is basically blank, There are no keys at all. This is during the initial loading. I don't get what is wrong. There are no errors related to Redis in debug log at all beyond 2022-04-26 11:23:25.939:DEBUG:pgsync.urls: Connecting to Redis without password.

Moulick commented 2 years ago

Nvm, what I said above, was a false understanding from my side, the logs were simply debug level logs of the Redis polling every 0.1s. 🤦

jpughcs commented 2 years ago

@jpughcs @toluaina could the issue be due to the Redis version? you are using 6.2 and I am trying with 5.0.5, while the documentation says 3.1.0.

I've tried 3.x, 5.x, 6.x

I don't think the root of my issue is necessarily related to Redis. I've been using a workaround successfully, removing these three lines (the first one here is the culprit: the post-batch, catch-up sync), which allows me to essentially schedule a full sync (without checkpoints or even the need for pg triggers and redis, which I've also stripped out) every five minutes or so (it completes in about 40 seconds with my dataset). This hammers ES pretty hard and locks up the indices, but it works for me in the meantime as my document count is relatively low and there's no need (yet) to have near-realtime updates. I don't think this will work for you with ~236 million records :( unless you just need to batch load your data and call it a day.