toluaina / pgsync

Postgres to Elasticsearch/OpenSearch sync
https://pgsync.com
MIT License
1.16k stars 180 forks source link

Invalid schema in parent-child relation #547

Open carlos-pereira-costoya opened 3 months ago

carlos-pereira-costoya commented 3 months ago

PGSync version: 3.0.0 Postgres version: PostgreSQL 15.5 (AWS Aurora db.r7g.xlarge, with writer and reader nodes) Elasticsearch/OpenSearch version: 2.1.1 (OpenSearch) Redis version: 5.0.6 (ElastiCache) Python version: 3.9

Problem description:

Hi @toluaina,

We are experiencing an issue in the following context:

With the following configuration, the described error is occurring only occasionally. The system tries to reference a table in a schema that does not exist; it tries to reference the table with the schema name of the parent table. That schema is also not referenced in the definition provided to pgsync.

Once this error occurs, the synchronization no longer progresses, and the process cannot continue. Based on the tests I have conducted, it appears that this is due to a left join that exists in this case and not in others, which is why it works in almost all cases except this one.

However, the underlying problem seems to be that the schema name being used is that of the parent table and not the one defined in the configuration.

schema1.json

Thank you very much!

Error Message (if any):

    sync.main()
  File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.9/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/pgsync/sync.py", line 1488, in main
    sync.pull()
  File "/usr/local/lib/python3.9/site-packages/pgsync/sync.py", line 1233, in pull
    self.logical_slot_changes(txmin=txmin, txmax=txmax, upto_nchanges=None)
  File "/usr/local/lib/python3.9/site-packages/pgsync/sync.py", line 436, in logical_slot_changes
    self.search_client.bulk(
  File "/usr/local/lib/python3.9/site-packages/pgsync/search_client.py", line 138, in bulk
    self._bulk(
  File "/usr/local/lib/python3.9/site-packages/pgsync/search_client.py", line 194, in _bulk
    for ok, _ in self.parallel_bulk(
  File "/usr/local/lib/python3.9/site-packages/opensearchpy/helpers/actions.py", line 487, in parallel_bulk
    for result in pool.imap(
  File "/usr/local/lib/python3.9/multiprocessing/pool.py", line 870, in next
    raise value
  File "/usr/local/lib/python3.9/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/usr/local/lib/python3.9/multiprocessing/pool.py", line 144, in _helper_reraises_exception
    raise ex
  File "/usr/local/lib/python3.9/multiprocessing/pool.py", line 388, in _guarded_task_generation
    for i, x in enumerate(iterable):
  File "/usr/local/lib/python3.9/site-packages/opensearchpy/helpers/actions.py", line 168, in _chunk_actions
    for action, data in actions:
  File "/usr/local/lib/python3.9/site-packages/pgsync/sync.py", line 850, in _payloads
    filters = self._insert_op(
  File "/usr/local/lib/python3.9/site-packages/pgsync/sync.py", line 585, in _insert_op
    _filters = self._through_node_resolver(
  File "/usr/local/lib/python3.9/site-packages/pgsync/sync.py", line 524, in _through_node_resolver
    foreign_key_constraint = payload.foreign_key_constraint(node.model)
  File "/usr/local/lib/python3.9/site-packages/pgsync/base.py", line 110, in foreign_key_constraint
    referred_table: str = str(foreign_key.constraint.referred_table)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/sql/schema.py", line 4751, in referred_table
    return self.elements[0].column.table
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 1146, in __get__
    obj.__dict__[self.__name__] = result = self.fget(obj)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/sql/schema.py", line 3160, in column
    return self._resolve_column()
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/sql/schema.py", line 3183, in _resolve_column
    raise exc.NoReferencedTableError(
sqlalchemy.exc.NoReferencedTableError: Foreign key associated with column 'street.street_type_id' could not find table 'coverage.street_type' with which to generate a foreign key to target column 'street_type_id'
bartoszpijet commented 2 months ago

I've tried to fix it, but I can't seem to be able to figure it out. I've just created an empty table in the public schema (in my case that's public) so it stopped crashing and just 'ignores' that data.