toluaina / pgsync

Postgres to Elasticsearch/OpenSearch sync
https://pgsync.com
MIT License
1.11k stars 174 forks source link

'Primary keys not subset' Issue with child tables and materialized views #416

Open erry-takumi opened 1 year ago

erry-takumi commented 1 year ago

PGSync version:

2.5.0 and also main

Postgres version: 13.8

Elasticsearch version:

6.8.18

Redis version: 6.2.4/alpine

Python version: 3.8.13

Problem Description:

(I'm on mac but running postgres, redis, & ES on linux docker machine). Python is running locally on mac.

TL;DR: I have a table with pkey named employee_id which has a foreign key user_id which references "user".id. I have made a materialized view which joins both tables. When I change anything (e.g. the full_name) in the 'user' table, I get an error: 2023-02-22 17:12:01.881:ERROR:pgsync.sync: Primary keys ['employee_id'] not subset of payload data dict_keys(['id']) for table public.employee_view

Long details follow:

The tables:


CREATE TABLE public.employee (
    employee_id uuid NOT NULL,
    user_id uuid NOT NULL,
    created timestamptz NOT NULL DEFAULT now(),
    modified timestamptz NULL,
    CONSTRAINT employee_pk PRIMARY KEY (employee_id),
    CONSTRAINT employee_fk FOREIGN KEY (user_id) REFERENCES public."user"(id)
);
CREATE TABLE public."user" (
    id uuid NOT NULL,
    created timestamptz NOT NULL DEFAULT now(),
    modified timestamptz NULL,
    full_name varchar NULL,
    CONSTRAINT user_pk PRIMARY KEY (id)
);

The materialized view:

CREATE MATERIALIZED VIEW public.employee_view
TABLESPACE pg_default
AS SELECT e.employee_id,
    e.user_id,
    e.created,
    e.modified,
    u.id,
    u.full_name
   FROM employee e
     JOIN "user" u ON e.user_id = u.id
WITH DATA;

NOT WORKING SCHEMA (with materialized view):

[
    {
        "database": "pgsynctest",
        "index": "test1",
        "setting": {
            "number_of_shards": 3,
            "number_of_replicas": 2
        },
        "nodes": {
            "table": "employee_view",
            "base_tables": ["employee", "user"],
            "primary_key": [
                "employee_id"
            ],
            "columns": [
                "employee_id",
                "user_id",
                "created",
                "modified",
                "full_name"
            ]
        }
    }
]

SIMILAR SCHEMA WITH PLAIN TABLES THAT WORKS FINE:

[
    {
        "database": "pgsynctest",
        "index": "test1",
        "setting": {
            "number_of_shards": 3,
            "number_of_replicas": 2
        },
        "nodes": {
            "table": "employee",
            "primary_key": [
                "employee_id"
            ],
            "columns": [
                "employee_id",
                "user_id",
                "created",
                "modified"
            ],
            "children": [
                {
                    "table": "user",
                    "columns": [
                        "id",
                        "full_name"
                    ],
                    "relationship": {
                        "variant": "object",
                        "type": "one_to_one",
                        "foreign_key": {
                            "child": [
                                "id"
                            ],
                            "parent": [
                                "user_id"
                            ]
                        }
                    }
                }
            ]
        }
    }
]

Upon attaching a debugger, I found the following:

Let me know if I'm configuring this incorrectly or something as well!

Error Message (if any):

2023-02-22 17:25:21.192:ERROR:pgsync.sync: Primary keys ['employee_id'] not subset of payload data dict_keys(['id']) for table public.employee_view
NoneType: None
2023-02-22 17:25:21.196:ERROR:pgsync.search_client: Exception No active exception to reraise
Traceback (most recent call last):
  File "/Users/erry/pgsync/pgsync/search_client.py", line 133, in bulk
    self._bulk(
  File "/Users/erry/pgsync/pgsync/search_client.py", line 188, in _bulk
    for _ in self.parallel_bulk(
  File "/Users/erry/pgsync/venv/lib/python3.8/site-packages/elasticsearch-7.13.4-py3.8.egg/elasticsearch/helpers/actions.py", line 472, in parallel_bulk
    for result in pool.imap(
  File "/Users/erry/.pyenv/versions/3.8.13/lib/python3.8/multiprocessing/pool.py", line 868, in next
    raise value
  File "/Users/erry/.pyenv/versions/3.8.13/lib/python3.8/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/Users/erry/.pyenv/versions/3.8.13/lib/python3.8/multiprocessing/pool.py", line 144, in _helper_reraises_exception
    raise ex
  File "/Users/erry/.pyenv/versions/3.8.13/lib/python3.8/multiprocessing/pool.py", line 388, in _guarded_task_generation
    for i, x in enumerate(iterable):
  File "/Users/erry/pgsync/venv/lib/python3.8/site-packages/elasticsearch-7.13.4-py3.8.egg/elasticsearch/helpers/actions.py", line 155, in _chunk_actions
    for action, data in actions:
  File "/Users/erry/pgsync/pgsync/sync.py", line 833, in _payloads
    raise
RuntimeError: No active exception to reraise
Exception in poll_redis() for thread Thread-28: No active exception to reraise
Exiting...
/Users/erry/.pyenv/versions/3.8.13/lib/python3.8/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 2 leaked semaphore objects to clean up at shutdown
  warnings.warn('resource_tracker: There appear to be %d '
toluaina commented 1 year ago

Sorry for the delay. At first glance, this looks like a bug. It should be looking at both base tables but it appears its only looking at user Bear with me and I'll investigate this in more detail.

masao-otomo commented 4 months ago

How did this issue turn out? I'm experiencing a similar issue in my environment.