toluaina / pgsync

Postgres to Elasticsearch/OpenSearch sync
https://pgsync.com
MIT License
1.16k stars 181 forks source link

PGSync tries to sync schema's from Postgres which are not mapped in schema.json #427

Open ravin-accelq opened 1 year ago

ravin-accelq commented 1 year ago

PGSync version: 2.5.0

Postgres version: 10

Elasticsearch version: 8.6.2

Redis version: 6.0.16

Python version: 3.11.2

Problem Description: I've configured the schema.json file to sync for accelq_schema of Postgres. But when there is no new data to sync, it goes to other schema's in Postgres and tries to sync them. But when there is data to sync it works fine. image

Error Message (if any):

2023-03-14 10:40:05.340:ERROR:pgsync.search_client: Exception Node for template_tenant_schema.project not found
Traceback (most recent call last):
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\site-packages\pgsync\search_client.py", line 133, in bulk
    self._bulk(
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\site-packages\pgsync\search_client.py", line 188, in _bulk
    for _ in self.parallel_bulk(
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\site-packages\elasticsearch\helpers\actions.py", line 472, in parallel_bulk
    for result in pool.imap(
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\pool.py", line 873, in next
    raise value
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
                    ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\pool.py", line 144, in _helper_reraises_exception
    raise ex
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\pool.py", line 391, in _guarded_task_generation
    for i, x in enumerate(iterable):
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\site-packages\elasticsearch\helpers\actions.py", line 155, in _chunk_actions
    for action, data in actions:
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\site-packages\pgsync\sync.py", line 836, in _payloads
    node: Node = self.tree.get_node(payload.table, payload.schema)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\site-packages\pgsync\node.py", line 332, in get_node
    raise RuntimeError(f"Node for {schema}.{table} not found")
RuntimeError: Node for template_tenant_schema.project not found
 0:00:02.588406 (2.59 sec)
Traceback (most recent call last):
  File "C:\accelq\AQReporting\pgsync\bin\pgsync", line 7, in <module>
    sync.main()
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\site-packages\click\core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\site-packages\click\core.py", line 1055, in main
    rv = self.invoke(ctx)
         ^^^^^^^^^^^^^^^^
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\site-packages\click\core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\site-packages\click\core.py", line 760, in invoke
    return __callback(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\site-packages\pgsync\sync.py", line 1450, in main
    sync.pull()
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\site-packages\pgsync\sync.py", line 1225, in pull
    self.logical_slot_changes(txmin=txmin, txmax=txmax, upto_nchanges=None)
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\site-packages\pgsync\sync.py", line 433, in logical_slot_changes
    self.search_client.bulk(
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\site-packages\pgsync\search_client.py", line 133, in bulk
    self._bulk(
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\site-packages\pgsync\search_client.py", line 188, in _bulk
    for _ in self.parallel_bulk(
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\site-packages\elasticsearch\helpers\actions.py", line 472, in parallel_bulk
    for result in pool.imap(
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\pool.py", line 873, in next
    raise value
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
                    ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\pool.py", line 144, in _helper_reraises_exception
    raise ex
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\pool.py", line 391, in _guarded_task_generation
    for i, x in enumerate(iterable):
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\site-packages\elasticsearch\helpers\actions.py", line 155, in _chunk_actions
    for action, data in actions:
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\site-packages\pgsync\sync.py", line 836, in _payloads
    node: Node = self.tree.get_node(payload.table, payload.schema)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\vivek\AppData\Local\Programs\Python\Python311\Lib\site-packages\pgsync\node.py", line 332, in get_node
    raise RuntimeError(f"Node for {schema}.{table} not found")
RuntimeError: Node for template_tenant_schema.project not found
vivekburman commented 1 year ago

image

Looks like it only checks for table being in one of the tables in schema.json file.

So in case like pg_1_schema has a table named user. And if there is another pg_2_schema which also has table named user. It will try to sync it also.

Ideally it should check for postgres schema is present in schema.json file.

toluaina commented 1 year ago

I believe this is what it's doing. Can you share your schema.json to get a better idea of what's going on?

ravin-accelq commented 1 year ago

[{ "database":"accelq_db", "index":"demo_scenario", "plugins":[ "CleanScenarioField" ], "nodes":{ "table":"scenario", "schema":"demo_schema", "primary_key":["pid"], "columns":[], "children":[ { "table":"project", "schema":"demo_schema", "primary_key":["pid"], "columns":[], "transform":{ "rename":{ "project_name":"code", "project_display_name":"display_name" } }, "relationship":{ "variant":"object", "type":"one_to_one", "foreign_key":{ "child":["pid"], "parent": ["pj_pid"] } } }, { "table":"users", "label":"user_created", "schema":"demo_schema", "primary_key":["pid"], "columns":[], "relationship":{ "variant":"object", "type":"one_to_one", "foreign_key":{ "child":["pid"], "parent": ["created_user"] } } }, { "table":"users", "label":"user_last_modified", "schema":"demo_schema", "primary_key":["pid"], "columns":[], "relationship":{ "variant":"object", "type":"one_to_one", "foreign_key":{ "child":["pid"], "parent": ["last_modified_user"] } } }, { "table":"conf_entity_status", "schema":"demo_schema", "label":"status", "primary_key":["pid"], "columns":[], "transform":{ "rename":{ "status_name":"name" } }, "relationship":{ "variant":"object", "type":"one_to_one", "foreign_key":{ "child":["pid"], "parent": ["status_pid"] } } }, { "table":"lu_cross_project_sharing_type", "schema":"demo_schema", "primary_key":["pid"], "label":"cross_project_sharing_type", "columns":[], "relationship":{ "variant":"object", "type":"one_to_one", "foreign_key":{ "child":["pid"], "parent": ["cross_project_sharing_type_pid"] } } }, { "table":"lu_test_type", "schema":"demo_schema", "label":"test_type", "primary_key":["pid"], "columns":[], "relationship":{ "variant":"object", "type":"one_to_one", "foreign_key":{ "child":["pid"], "parent": ["test_type_pid"] } } }, { "table":"metadata_value", "schema":"demo_schema", "primary_key":["pid"], "columns":[], "children":[ { "table":"project", "schema":"demo_schema", "primary_key":["pid"], "columns":["pid"], "relationship":{ "variant":"object", "type":"one_to_one", "foreign_key":{ "child":["pid"], "parent": ["pj_pid"] } } } ], "relationship":{ "variant":"object", "type":"one_to_one", "foreign_key":{ "child":["pid"], "parent": ["mv_pid"] } } }, { "table":"conf_metadata", "schema":"demo_schema", "primary_key":["pid"], "columns":[], "children":[ { "table":"conf_entity_metadata", "schema":"demo_schema", "primary_key":["pid"], "columns":[], "children":[{ "table":"lu_top_entity_type", "schema":"demo_schema", "primary_key":["pid"], "columns":[], "relationship":{ "variant":"object", "type":"one_to_one", "foreign_key":{ "child":["pid"], "parent": ["lu_top_entity_type_pid"] } } }], "relationship":{ "variant":"object", "type":"one_to_many", "foreign_key":{ "child":["conf_metadata_pid"], "parent": ["pid"] } } } ], "relationship":{ "variant":"object", "type":"one_to_many", "foreign_key":{ "child":["pj_pid"], "parent": ["pj_pid"] } } }, { "table":"scn_reln_scenario_ext_int_work_item", "label":"traceability", "schema":"demo_schema", "primary_key":["pid"], "columns":[], "children":[ { "table":"ext_int_work_item", "label":"work_item", "schema":"demo_schema", "primary_key":["pid"], "columns":[], "relationship":{ "variant":"object", "type":"one_to_one", "foreign_key":{ "child":["pid"], "parent": ["ext_int_work_item_pid"] } } } ], "relationship":{ "variant":"object", "type":"one_to_many", "foreign_key":{ "child":["scn_pid"], "parent": ["pid"] } } } ] } }]