toluaina / pgsync

Postgres to Elasticsearch/OpenSearch sync
https://pgsync.com
MIT License
1.16k stars 181 forks source link

Fix inserts in through tables #361

Closed mpaolino closed 1 year ago

mpaolino commented 1 year ago

This fixed the issue for my case, and could probably also fix:

142

264

321

My simplified DB:

my_customer_groups:
     pk id
     fk my_customer_id   (to id in my_customers)
     fk my_group_id         (to id in my_groups)

my_customers
     pk id
     name

my_groups 
     pk id
     group_name

My schema.json

 {
        "database":"my_db",
        "index":"testindex",
        "nodes": {
                    "table":"my_customers",
                    "label":"customers",
                    "columns":[
                        "id",
                        "name",
                    ],
                    "children":[
                         {
                           "table":"my_groups",
                           "columns":[
                                   "id",
                                  "group_name",
                             ],
                           "relationship":{
                                "variant":"object",
                                "type":"one_to_many",
                                "through_tables":[
                                    "my_customer_groups"
                                ]
                            }
                         }
                     ]
         }
 }

When the code reaches this point after an insert on the through table I get:

foreign_keys = {'my_customer_groups': ['my_customer_id', 'my_group_id], 'my_customers': ['id'], 'my_groups': ['id']}

At that point it fails with this error:

2022-10-17 22:15:57.859:ERROR:pgsync.elastichelper: Exception list index out of range
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/pgsync/elastichelper.py", line 140, in bulk
    raise_on_error=raise_on_error,
  File "/usr/local/lib/python3.7/site-packages/pgsync/elastichelper.py", line 195, in _bulk
    ignore_status=ignore_status,
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/helpers/actions.py", line 484, in parallel_bulk
    actions, chunk_size, max_chunk_bytes, client.transport.serializer
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 748, in next
    raise value
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 140, in _helper_reraises_exception
    raise ex
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 292, in _guarded_task_generation
    for i, x in enumerate(iterable):
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/helpers/actions.py", line 155, in _chunk_actions
    for action, data in actions:
  File "/usr/local/lib/python3.7/site-packages/pgsync/sync.py", line 809, in _payloads
    payloads,
  File "/usr/local/lib/python3.7/site-packages/pgsync/sync.py", line 482, in _insert_op
    if key == foreign_keys[node.parent.name][i]:
IndexError: list index out of range

I've tried all kinds of configurations, specifying the foreing_keys, transformations, specifying nested types, but nothing works, I ended up debugging and working this out but I think there is an underlying issue.

This bug happens in Postgres 14 and AWS RDS when I use a through table in a child and insert directly into it.

The tests seem to run cleanly, but this workaround is far from ideal.

Please also check this line and downwards, looks like dead code to me, it seems you are adding all through tables to self.tree.tables so I think that block should probably never be reached. Also note that I think this is just a workaround, if I use filter[node.parent.table] instead of filter[node.parent.name] I will get an exception on insert:

2022-10-24 19:54:55.830:ERROR:pgsync.elastichelper: Exception Select statement '<sqlalchemy.sql.selectable.Select object at 0x7f6684e27ad0>' returned no FROM clauses due to auto-correlation; specify correlate(<tables>) to control correlation manually.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/pgsync/elastichelper.py", line 140, in bulk
    raise_on_error=raise_on_error,
  File "/usr/local/lib/python3.7/site-packages/pgsync/elastichelper.py", line 195, in _bulk
    ignore_status=ignore_status,
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/helpers/actions.py", line 484, in parallel_bulk
    actions, chunk_size, max_chunk_bytes, client.transport.serializer
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 748, in next
    raise value
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 140, in _helper_reraises_exception
    raise ex
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 292, in _guarded_task_generation
    for i, x in enumerate(iterable):
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/helpers/actions.py", line 155, in _chunk_actions
    for action, data in actions:
  File "/usr/local/lib/python3.7/site-packages/pgsync/sync.py", line 855, in _payloads
    yield from self.sync(filters=filters, extra=extra)
  File "/usr/local/lib/python3.7/site-packages/pgsync/sync.py", line 884, in sync
    count: int = self.fetchcount(node._subquery)
  File "/usr/local/lib/python3.7/site-packages/pgsync/base.py", line 807, in fetchcount
    ).order_by(None)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1380, in execute
    return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    self, multiparams, params, execution_options
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1570, in _execute_clauseelement
    linting=self.dialect.compiler_linting | compiler.WARN_LINTING,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 537, in _compile_w_cache
    **kw
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 566, in _compiler
    return dialect.statement_compiler(dialect, self, **kw)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 790, in __init__
    Compiled.__init__(self, dialect, statement, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 463, in __init__
    self.string = self.process(self.statement, **compile_kwargs)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 498, in process
    return obj._compiler_dispatch(self, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/visitors.py", line 82, in _compiler_dispatch
    return meth(self, **kw)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 3449, in visit_select
    kwargs,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 3592, in _compose_select_body
    for f in froms
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 3592, in <listcomp>
    for f in froms
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/visitors.py", line 82, in _compiler_dispatch
    return meth(self, **kw)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 3832, in visit_join
    + join.onclause._compiler_dispatch(
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/visitors.py", line 82, in _compiler_dispatch
    return meth(self, **kw)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 2959, in visit_lateral
    return "LATERAL %s" % self.visit_alias(lateral_, **kw)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 2915, in visit_alias
    self, asfrom=True, lateral=lateral, **kwargs
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/visitors.py", line 82, in _compiler_dispatch
    return meth(self, **kw)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 2955, in visit_subquery
    return self.visit_alias(subquery, **kw)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 2894, in visit_alias
    **kwargs
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/visitors.py", line 82, in _compiler_dispatch
    return meth(self, **kw)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 3449, in visit_select
    kwargs,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 3592, in _compose_select_body
    for f in froms
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 3592, in <listcomp>
    for f in froms
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/visitors.py", line 82, in _compiler_dispatch
    return meth(self, **kw)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 3832, in visit_join
    + join.onclause._compiler_dispatch(
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/visitors.py", line 82, in _compiler_dispatch
    return meth(self, **kw)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 2959, in visit_lateral
    return "LATERAL %s" % self.visit_alias(lateral_, **kw)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 2915, in visit_alias
    self, asfrom=True, lateral=lateral, **kwargs
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/visitors.py", line 82, in _compiler_dispatch
    return meth(self, **kw)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 2955, in visit_subquery
    return self.visit_alias(subquery, **kw)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 2894, in visit_alias
    **kwargs
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/visitors.py", line 82, in _compiler_dispatch
    return meth(self, **kw)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 3353, in visit_select
    select_stmt, compile_state, entry, asfrom, lateral, compound_index
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 3529, in _setup_select_stack
    implicit_correlate_froms=asfrom_froms,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/selectable.py", line 4589, in _get_display_froms
    "manually." % self.statement
sqlalchemy.exc.InvalidRequestError: Select statement '<sqlalchemy.sql.selectable.Select object at 0x7f6684e27ad0>' returned no FROM clauses due to auto-correlation; specify correlate(<tables>) to control correlation manually.
Exception in poll_redis() for thread Thread-48: Select statement '<sqlalchemy.sql.selectable.Select object at 0x7f6684e27ad0>' returned no FROM clauses due to auto-correlation; specify correlate(<tables>) to control correlation manually.
Exiting...

I tried to reproduce this bug with the book example with no luck.

What I can see is that when debugging the _insert_op method is the foreign_keys dictionary generated is not what the code expects.

I hope this can serve to narrow down the issue and fix it! Cheers.

nsupegemini commented 1 year ago

Could we please merge this PR?

mpaolino commented 1 year ago

@toluaina have you had any time to look into this? This workaround has been working in production for more than a month with no issues.

stefano-tilt commented 1 year ago

Hey @mpaolino @toluaina , thanks for providing a fix, I have a similar scenario but your branch is not working for me. I still get IndexError: list index out of range

I am currently testing this code here.

Let me know if this approach is correct or suggest a better one.

Thanks

Shankar-khati commented 1 year ago

https://github.com/toluaina/pgsync/pull/413 should fix this issue

toluaina commented 1 year ago

Apologies for the delay. The latest release 2.5 should fully address this.

nsupegemini commented 1 year ago

is anyone getting error RuntimeError: Required materialized view columns not present on _view. Please re-run bootstrap. after upgrading to 2.5 version?

mpaolino commented 1 year ago

Not me, for now it works fine. I'll keep you updated if issues arise.

On Thu, Feb 2, 2023, 12:09 AM nsupegemini @.***> wrote:

is anyone getting error RuntimeError: Required materialized view columns not present on _view. Please re-run bootstrap. after upgrading to 2.5 version?

— Reply to this email directly, view it on GitHub https://github.com/toluaina/pgsync/pull/361#issuecomment-1413088948, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAFZBVK6VNQQJ5NFX6ZT5LDWVMQOBANCNFSM6AAAAAARNKZ4GA . You are receiving this because you modified the open/close state.Message ID: @.***>

vinhnxv commented 1 year ago

is anyone getting error RuntimeError: Required materialized view columns not present on _view. Please re-run bootstrap. after upgrading to 2.5 version?

Hi @nsupegemini While waiting for a good solution from the contributors, I try to drop the materialized view (_view) in my database. You can try: drop materialized view _view; Then run: bootstrap -c schema.json I hope this is useful for you!

nsupegemini commented 1 year ago

Thank you @vinhnxv Thank you for the solution. It worked. Sorry for late reply.

Is this view created by PGSync?