Open jinserk opened 2 years ago
So the db events should be equivalent to the redis events. In your case, nothing is being propagated to redis it seems. Do you use auth for redis? is this a managed redis instance. is there a firewall? Can you actually add any record to redis using the credentials in your .env?
This is my full docker-compose.yml
:
version: '3'
services:
redis:
image: redis:latest
container_name: redis
hostname: redis
expose:
- 6379
command: redis-server --requirepass *****
pgsync:
image: private/pgsync:latest
container_name: pgsync
hostname: pgsync
build:
context: .
dockerfile: Dockerfile.pgsync
command: ./runserver.sh
sysctls:
- net.ipv4.tcp_keepalive_time=200
- net.ipv4.tcp_keepalive_intvl=200
- net.ipv4.tcp_keepalive_probes=5
labels:
org.label-schema.name: "pgsync"
org.label-schema.description: "Postgres to elasticsearch sync"
com.label-schema.service-type: "daemon"
depends_on:
- redis
environment:
- LOG_LEVEL=INFO
- QUERY_CHUNK_SIZE=1000
- POLL_TIMEOUT=1
- ELASTICSEARCH_SCHEME=https
- ELASTICSEARCH_HOST=*****
- ELASTICSEARCH_PORT=9200
- ELASTICSEARCH_USER=*****
- ELASTICSEARCH_PASSWORD=*****
- ELASTICSEARCH_TIMEOUT=100
- ELASTICSEARCH_CHUNK_SIZE=100
- ELASTICSEARCH_VERIFY_CERTS=false
- ELASTICSEARCH_USE_SSL=true
- ELASTICSEARCH_SSL_SHOW_WARN=false
- ELASTICSEARCH_STREAMING_BULK=true
- ELASTICSEARCH_MAX_RETRIES=10
- PG_HOST=*****
- PG_PORT=5432
- PG_USER=*****
- PG_PASSWORD=*****
- REDIS_HOST=redis
- REDIS_PORT=6379
- REDIS_AUTH=*****
And I matched the redis password in the command of redic container and pgsync's env var REDIS_AUTH.
And at least I can get OK from auth in redis-cli
of the container:
root@redis:/data# redis-cli
127.0.0.1:6379> auth *****
OK
127.0.0.1:6379>
ELASTICSEARCH_SCHEME
and ELASTICSEARCH_PORT
.That's odd because the first copy of the records was successful. We have 45M records in psql db, and the first successful copy was 45,521,632 records, and now 45,521,869 records are in our db. I'm concerning the additional ~300 records are not synced after that. If the port is not mismatched, why the first trial was successful? I'll do the latest master branch and keep you posted.
Any update on running off master. My guess is that with about 45 M records in your database, you still have pending items to go through in the replication slot.
We are having the exact same issue, 35,075,199 records sync over successfully initially, but the automated sync does not work afterwards. The logs look exactly the same as that of @jinserk, the database record count increases but the Redis count stays at 0.
After investigating further, it appears as though the Redis queue is in fact being populated with changes, but PGSync still logs that there are 0 changes. Eventually the Redis server runs out of memory.
@jinserk I noticed you make use of Opensearch and RDS, as do we. Do you also use Elasticache for Redis?
@voyc-jean No. all the sync subsystems are on our on-prem server with Docker compose env. The redis is a node on the docker system. We're going to deploy the system to AWS opensearch service as a long term goal, so we might need to test with the Elasticache Redis, but not now.
Basically we failed to figure out with pgsync, so we're testing confluent platform + kafka connectors (debezium postgresql source connector / opeansearch sync connector) now.
Same issue here, run pgsync will sync initial data successfully, but first data update in Db show this error:
2021-12-22 05:03:46.439:ERROR:pgsync.sync: Exception 'NoneType' object is not subscriptable
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 875, in sync
self.es.bulk(self.index, docs)
File "/usr/local/lib/python3.8/dist-packages/pgsync/elastichelper.py", line 124, in bulk
for _ in helpers.parallel_bulk(
File "/usr/local/lib/python3.8/dist-packages/elasticsearch/helpers/actions.py", line 472, in parallel_bulk
for result in pool.imap(
File "/usr/lib/python3.8/multiprocessing/pool.py", line 868, in next
raise value
File "/usr/lib/python3.8/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.8/multiprocessing/pool.py", line 144, in _helper_reraises_exception
raise ex
File "/usr/lib/python3.8/multiprocessing/pool.py", line 388, in _guarded_task_generation
for i, x in enumerate(iterable):
File "/usr/local/lib/python3.8/dist-packages/elasticsearch/helpers/actions.py", line 155, in _chunk_actions
for action, data in actions:
File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 714, in _payloads
filters = self._update(
File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 435, in _update
primary_values: list = [
File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 436, in <listcomp>
payload_data[key] for key in node.model.primary_keys
TypeError: 'NoneType' object is not subscriptable
Exception in thread Thread-16:
Traceback (most recent call last):
File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 904, in poll_redis
self.on_publish(payloads)
File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 990, in on_publish
self.sync(self._payloads(_payloads))
File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 875, in sync
self.es.bulk(self.index, docs)
File "/usr/local/lib/python3.8/dist-packages/pgsync/elastichelper.py", line 124, in bulk
for _ in helpers.parallel_bulk(
File "/usr/local/lib/python3.8/dist-packages/elasticsearch/helpers/actions.py", line 472, in parallel_bulk
for result in pool.imap(
File "/usr/lib/python3.8/multiprocessing/pool.py", line 868, in next
raise value
File "/usr/lib/python3.8/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.8/multiprocessing/pool.py", line 144, in _helper_reraises_exception
raise ex
File "/usr/lib/python3.8/multiprocessing/pool.py", line 388, in _guarded_task_generation
for i, x in enumerate(iterable):
File "/usr/local/lib/python3.8/dist-packages/elasticsearch/helpers/actions.py", line 155, in _chunk_actions
for action, data in actions:
File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 714, in _payloads
filters = self._update(
File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 435, in _update
primary_values: list = [
File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 436, in <listcomp>
payload_data[key] for key in node.model.primary_keys
TypeError: 'NoneType' object is not subscriptable
After that, any data update will make the logs will look like this:
Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ...
Syncing mydata Db: [3] => Redis: [1] => Elastic: [0] ...
It increases count in Db but not in Redis and ES
Same issue here, run pgsync will sync initial data successfully, but first data update in Db show this error:
2021-12-22 05:03:46.439:ERROR:pgsync.sync: Exception 'NoneType' object is not subscriptable Traceback (most recent call last): File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 875, in sync self.es.bulk(self.index, docs) File "/usr/local/lib/python3.8/dist-packages/pgsync/elastichelper.py", line 124, in bulk for _ in helpers.parallel_bulk( File "/usr/local/lib/python3.8/dist-packages/elasticsearch/helpers/actions.py", line 472, in parallel_bulk for result in pool.imap( File "/usr/lib/python3.8/multiprocessing/pool.py", line 868, in next raise value File "/usr/lib/python3.8/multiprocessing/pool.py", line 125, in worker result = (True, func(*args, **kwds)) File "/usr/lib/python3.8/multiprocessing/pool.py", line 144, in _helper_reraises_exception raise ex File "/usr/lib/python3.8/multiprocessing/pool.py", line 388, in _guarded_task_generation for i, x in enumerate(iterable): File "/usr/local/lib/python3.8/dist-packages/elasticsearch/helpers/actions.py", line 155, in _chunk_actions for action, data in actions: File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 714, in _payloads filters = self._update( File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 435, in _update primary_values: list = [ File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 436, in <listcomp> payload_data[key] for key in node.model.primary_keys TypeError: 'NoneType' object is not subscriptable Exception in thread Thread-16: Traceback (most recent call last): File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner self.run() File "/usr/lib/python3.8/threading.py", line 870, in run self._target(*self._args, **self._kwargs) File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 904, in poll_redis self.on_publish(payloads) File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 990, in on_publish self.sync(self._payloads(_payloads)) File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 875, in sync self.es.bulk(self.index, docs) File "/usr/local/lib/python3.8/dist-packages/pgsync/elastichelper.py", line 124, in bulk for _ in helpers.parallel_bulk( File "/usr/local/lib/python3.8/dist-packages/elasticsearch/helpers/actions.py", line 472, in parallel_bulk for result in pool.imap( File "/usr/lib/python3.8/multiprocessing/pool.py", line 868, in next raise value File "/usr/lib/python3.8/multiprocessing/pool.py", line 125, in worker result = (True, func(*args, **kwds)) File "/usr/lib/python3.8/multiprocessing/pool.py", line 144, in _helper_reraises_exception raise ex File "/usr/lib/python3.8/multiprocessing/pool.py", line 388, in _guarded_task_generation for i, x in enumerate(iterable): File "/usr/local/lib/python3.8/dist-packages/elasticsearch/helpers/actions.py", line 155, in _chunk_actions for action, data in actions: File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 714, in _payloads filters = self._update( File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 435, in _update primary_values: list = [ File "/usr/local/lib/python3.8/dist-packages/pgsync/sync.py", line 436, in <listcomp> payload_data[key] for key in node.model.primary_keys TypeError: 'NoneType' object is not subscriptable
After that, any data update will make the logs will look like this:
Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [1] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [2] => Redis: [1] => Elastic: [0] ... Syncing mydata Db: [3] => Redis: [1] => Elastic: [0] ...
It increases count in Db but not in Redis and ES
Same problem here. The issues does not reproduce when schema name is public, but we need to use a different schema name. Any hints on what is causing this @toluaina ?
@camirus27 @tthanh I feel this issue is different to the original one. Can you please create a separate issue to with as much details as you can to enable me track it better.
I created a new issue https://github.com/toluaina/pgsync/issues/215 for this @tthanh @toluaina
@tthanh @jinserk Could either of you post the PGSync JSON schema that was used?
@voyc-jean Sure. Here you are:
[
{
"database": "kyuluxmain",
"index": "kyu-mol-211208",
"nodes": {
"table": "pgmols_mol",
"label": "mols",
"schema": "public",
"columns": [
"id",
"createtime",
"inchikey",
"smiles",
"mass"
],
"children": [
{
"table": "pgmols_molnickname",
"label": "nicknames",
"columns": [
"name"
],
"relationship": {
"type": "one_to_many",
"variant": "scalar"
}
},
{
"table": "pgmols_molset",
"label": "tags",
"columns": [
"name"
],
"relationship": {
"type": "one_to_many",
"variant": "scalar",
"through_tables": [
"pgmols_molset_mols"
]
}
},
{
"table": "substances_substance",
"label": "substance",
"columns": [
"id",
"name"
],
"relationship": {
"type": "one_to_one",
"variant": "object"
}
},
{
"table": "arithmetics_s0",
"label": "s0",
"columns": [
"id",
"absorption",
"emission",
"splitting",
"strength",
"tadfrate",
"fluorrate",
"pbht",
"homo",
"lumo",
"chargetransfersplitting",
"triplet"
],
"relationship": {
"type": "one_to_one",
"variant": "object"
}
},
{
"table": "arithmetics_t1",
"label": "t1",
"columns": [
"id",
"absorption",
"emission",
"splitting",
"strength",
"tadfrate",
"fluorrate",
"pbht",
"homo",
"lumo",
"chargetransfersplitting",
"triplet"
],
"relationship": {
"type": "one_to_one",
"variant": "object"
}
}
]
}
}
]
I'm testing pgsync with 2.1.10
and found the new logs you mentioned:
opensearch-redis | 1:M 12 Jan 2022 13:30:07.238 * 1 changes in 3600 seconds. Saving...
opensearch-redis | 1:M 12 Jan 2022 13:30:07.238 * Background saving started by pid 19
opensearch-redis | 19:C 12 Jan 2022 13:30:07.261 * DB saved on disk
opensearch-redis | 19:C 12 Jan 2022 13:30:07.262 * RDB: 0 MB of memory used by copy-on-write
opensearch-redis | 1:M 12 Jan 2022 13:30:07.339 * Background saving terminated with success
opensearch-redis | 1:M 12 Jan 2022 13:44:47.418 * 100 changes in 300 seconds. Saving...
opensearch-redis | 1:M 12 Jan 2022 13:44:47.418 * Background saving started by pid 20
opensearch-redis | 20:C 12 Jan 2022 13:44:47.451 * DB saved on disk
opensearch-redis | 20:C 12 Jan 2022 13:44:47.451 * RDB: 0 MB of memory used by copy-on-write
opensearch-redis | 1:M 12 Jan 2022 13:44:47.520 * Background saving terminated with success
opensearch-redis | 1:M 12 Jan 2022 13:49:48.000 * 100 changes in 300 seconds. Saving...
opensearch-redis | 1:M 12 Jan 2022 13:49:48.001 * Background saving started by pid 21
opensearch-redis | 21:C 12 Jan 2022 13:49:48.036 * DB saved on disk
opensearch-redis | 21:C 12 Jan 2022 13:49:48.037 * RDB: 0 MB of memory used by copy-on-write
opensearch-redis | 1:M 12 Jan 2022 13:49:48.102 * Background saving terminated with success
opensearch-redis | 1:M 12 Jan 2022 13:54:49.076 * 100 changes in 300 seconds. Saving...
opensearch-redis | 1:M 12 Jan 2022 13:54:49.076 * Background saving started by pid 22
opensearch-redis | 22:C 12 Jan 2022 13:54:49.096 * DB saved on disk
opensearch-redis | 22:C 12 Jan 2022 13:54:49.097 * RDB: 0 MB of memory used by copy-on-write
opensearch-redis | 1:M 12 Jan 2022 13:54:49.176 * Background saving terminated with success
opensearch-redis | 1:M 12 Jan 2022 13:59:50.082 * 100 changes in 300 seconds. Saving...
opensearch-redis | 1:M 12 Jan 2022 13:59:50.083 * Background saving started by pid 23
opensearch-redis | 23:C 12 Jan 2022 13:59:50.105 * DB saved on disk
opensearch-redis | 23:C 12 Jan 2022 13:59:50.105 * RDB: 0 MB of memory used by copy-on-write
opensearch-redis | 1:M 12 Jan 2022 13:59:50.183 * Background saving terminated with success
Is this working as expected? The added records are not still shown on the opensearch index.
PS: I guess the "100 changes in 300 seconds" is being repeated.
PPS: No checkpoint files in the path. Is it normal?
I'm reading the code of pgsync
, and not fully understand yet, but in the pgsync/sync.py
:
with Timer():
for document in json.load(open(config)):
sync = Sync(document, verbose=verbose, **kwargs)
sync.pull()
if daemon:
sync.receive()
I wonder if it should be
with Timer():
for document in json.load(open(config)):
sync = Sync(document, verbose=verbose, **kwargs)
if daemon:
sync.receive()
else:
sync.pull()
I'm reading the code of
pgsync
, and not fully understand yet, but in thepgsync/sync.py
:with Timer(): for document in json.load(open(config)): sync = Sync(document, verbose=verbose, **kwargs) sync.pull() if daemon: sync.receive()
I wonder if it should be
with Timer(): for document in json.load(open(config)): sync = Sync(document, verbose=verbose, **kwargs) if daemon: sync.receive() else: sync.pull()
pgsync does two passes to ensure all data in a live environment is synced and no data is missed.
so we need to do both receive()
and then pull()
in that order
I'm testing pgsync with
2.1.10
and found the new logs you mentioned:opensearch-redis | 1:M 12 Jan 2022 13:30:07.238 * 1 changes in 3600 seconds. Saving... opensearch-redis | 1:M 12 Jan 2022 13:30:07.238 * Background saving started by pid 19 opensearch-redis | 19:C 12 Jan 2022 13:30:07.261 * DB saved on disk opensearch-redis | 19:C 12 Jan 2022 13:30:07.262 * RDB: 0 MB of memory used by copy-on-write opensearch-redis | 1:M 12 Jan 2022 13:30:07.339 * Background saving terminated with success opensearch-redis | 1:M 12 Jan 2022 13:44:47.418 * 100 changes in 300 seconds. Saving... opensearch-redis | 1:M 12 Jan 2022 13:44:47.418 * Background saving started by pid 20 opensearch-redis | 20:C 12 Jan 2022 13:44:47.451 * DB saved on disk opensearch-redis | 20:C 12 Jan 2022 13:44:47.451 * RDB: 0 MB of memory used by copy-on-write opensearch-redis | 1:M 12 Jan 2022 13:44:47.520 * Background saving terminated with success opensearch-redis | 1:M 12 Jan 2022 13:49:48.000 * 100 changes in 300 seconds. Saving... opensearch-redis | 1:M 12 Jan 2022 13:49:48.001 * Background saving started by pid 21 opensearch-redis | 21:C 12 Jan 2022 13:49:48.036 * DB saved on disk opensearch-redis | 21:C 12 Jan 2022 13:49:48.037 * RDB: 0 MB of memory used by copy-on-write opensearch-redis | 1:M 12 Jan 2022 13:49:48.102 * Background saving terminated with success opensearch-redis | 1:M 12 Jan 2022 13:54:49.076 * 100 changes in 300 seconds. Saving... opensearch-redis | 1:M 12 Jan 2022 13:54:49.076 * Background saving started by pid 22 opensearch-redis | 22:C 12 Jan 2022 13:54:49.096 * DB saved on disk opensearch-redis | 22:C 12 Jan 2022 13:54:49.097 * RDB: 0 MB of memory used by copy-on-write opensearch-redis | 1:M 12 Jan 2022 13:54:49.176 * Background saving terminated with success opensearch-redis | 1:M 12 Jan 2022 13:59:50.082 * 100 changes in 300 seconds. Saving... opensearch-redis | 1:M 12 Jan 2022 13:59:50.083 * Background saving started by pid 23 opensearch-redis | 23:C 12 Jan 2022 13:59:50.105 * DB saved on disk opensearch-redis | 23:C 12 Jan 2022 13:59:50.105 * RDB: 0 MB of memory used by copy-on-write opensearch-redis | 1:M 12 Jan 2022 13:59:50.183 * Background saving terminated with success
Is this working as expected? The added records are not still shown on the opensearch index.
PS: I guess the "100 changes in 300 seconds" is being repeated.
PPS: No checkpoint files in the path. Is it normal?
My guess is that pgsync is still doing the initial sync? You should see some output in the console such as:
- book
|- publisher
|- book_language
|- author
| - city
| - country
| - continent
|- language
|- subject
- rating
[==================================================] 8/8 100%
[--------------------------------------------------] 0/0 0%
0:00:00.688371 (0.69 sec)
Syncing book Xlog: [0] => Db: [0] => Redis: [total = 0 pending = 0] => Elastic: [8] ...
Syncing book Xlog: [0] => Db: [0] => Redis: [total = 0 pending = 0] => Elastic: [8] ...
Syncing book Xlog: [0] => Db: [0] => Redis: [total = 0 pending = 0] => Elastic: [8] ...
Not seeing anything at all is unusual.
I missed the PYTHONUNBUFFERED=1
in my docker image, so will rerun after adding it.
I'll keep you posted the result. Thank you so much!
@jinserk might be worth re runing bootstrap and then restart pgsync?
- bootstrap -t schema.json
- bootstrap schema.json
- pgsync schema.json
PGSync version: 2.1.9
Postgres version: 11.12 (AWS RDS)
Elasticsearch version: 7.10.2
Redis version: 6.2.6
Python version: 3.9.9
Problem Description: I'm using the latest pypi pgsync in docker env. First indexing was successful but the following sync doesn't look to work.
I checked kibana and count the number of records in the index but it has not been changed.
Here is my env vars:
Is there anything I can check if the sync is working or not?