toluaina / pgsync

Postgres to Elasticsearch/OpenSearch sync
https://pgsync.com
MIT License
1.19k stars 182 forks source link

Operator Does Not Exist: UUID = INTEGER When Child and parent table have different id type #543

Closed dealertouch closed 3 months ago

dealertouch commented 5 months ago

PGSync version: Version: 3.1.0

Postgres version: PostgreSQL 16.3

Elasticsearch/OpenSearch version:

{
  "name": "opensearch-node1",
  "cluster_name": "opensearch-cluster",
  "cluster_uuid": "zXLzc_O5SQGhuP0lQrhHNg",
  "version": {
    "distribution": "opensearch",
    "number": "2.14.0",
    "build_type": "tar",
    "build_hash": "aaa555453f4713d652b52436874e11ba258d8f03",
    "build_date": "2024-05-09T18:51:35.817208743Z",
    "build_snapshot": false,
    "lucene_version": "9.10.0",
    "minimum_wire_compatibility_version": "7.10.0",
    "minimum_index_compatibility_version": "7.0.0"
  },
  "tagline": "The OpenSearch Project: https://opensearch.org/"
}

Redis version: Redis version=7.2.5

Python version: Python 3.10.2

Problem Description: PGSync crashes when adding a new row to table phone_number Create following tables in Postgres

CREATE TYPE phone_number_type AS ENUM ('home', 'work', 'cell', 'fax');
CREATE TABLE customers
(
    id                      uuid PRIMARY KEY,
    date_of_birth           date,
    email                   varchar(255),
    first_name              varchar(255),
    last_name               varchar(255)
);
CREATE TABLE phone_numbers
(
    id SERIAL PRIMARY KEY,
    phone_type  phone_number_type,
    phone_number VARCHAR(20),
    extension VARCHAR(10)
);
CREATE TABLE customer_phone_numbers
(
    customer_id uuid REFERENCES customers (id) ON DELETE CASCADE,
    phone_number_id INTEGER REFERENCES phone_numbers (id) ON DELETE CASCADE,
    PRIMARY KEY (customer_id, phone_number_id)
);

When I add a new row to phone_numbers table, pgsync crashes with exception

Exception in poll_redis() for thread Thread-17: (psycopg2.errors.UndefinedFunction) operator does not exist: uuid = integer
LINE 6: WHERE customers_1.id = 1

it will resolve if I run bootstrap command again before running pgsync but it requires run of bootstrap after any new row added to phone_numbers

this is my schema.json file

[
    {
        "database": "test",
        "index": "customer",
        "nodes": {
            "table": "customers",
            "columns": [
                "id",
                "first_name",
                "last_name",
                "date_of_birth",
                "email"
            ],
            "children": [
                {
                    "table": "phone_numbers",
                    "label": "phone_number",
                    "columns": [
                        "id",
                        "phone_number"
                    ],
                    "relationship": {
                        "variant": "scalar",
                        "type": "one_to_many",
                        "through_tables": [
                            "customer_phone_numbers"
                        ]
                    }
                }
            ],
            "transform": {
                "concat": [
                    {
                        "columns": [
                            "first_name",
                            "last_name"
                        ],
                        "destination": "full_name",
                        "delimiter": " "
                    }
                ],
                "mapping": {
                    "date_of_birth": {
                        "type": "date",
                        "format": "strict_date_optional_time || epoch_second"
                    },
                    "full_name": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            },
                            "search_as_you_type": {
                                "type": "search_as_you_type",
                                "max_shingle_size": 3
                            }
                        }
                    },
                    "email": {
                        "type": "text",
                        "fields": {
                            "keyword": {
                                "type": "keyword",
                                "ignore_above": 256
                            },
                            "search_as_you_type": {
                                "type": "search_as_you_type",
                                "max_shingle_size": 3
                            }
                        }
                    }
                }
            }
        }
    }
]

Error Message (if any):

 - public.customers
    - public.phone_number
  [--------------------------------------------------]  0/0    0%
  [--------------------------------------------------]  0/0    0%
 0:00:00.358809 (0.36 sec)
Sync test:customer Xlog: [0] => Db: [0] => Redis: [0] => OpenSearch: [0]...
2024-06-11 22:03:58.520:ERROR:pgsync.search_client: Exception (psycopg2.errors.UndefinedFunction) operator does not exist: uuid = integer
LINE 6: WHERE customers_1.id = 1
                             ^
HINT:  No operator matches the given name and argument types. You might need to add explicit type casts.

[SQL: SELECT count(*) AS count_1
FROM public.customers AS customers_1 LEFT OUTER JOIN LATERAL (SELECT JSON_AGG(CAST(anon_2._keys AS JSONB) || CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_1)s, JSON_BUILD_ARRAY(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_2)s, JSON_BUILD_ARRAY(customer_phone_numbers_1.customer_id)), JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_3)s, JSON_BUILD_ARRAY(customer_phone_numbers_1.phone_number_id)))) AS JSONB)) AS _keys, JSON_AGG(anon_2.anon) AS phone_number, customer_phone_numbers_1.customer_id AS customer_id
FROM public.customer_phone_numbers AS customer_phone_numbers_1 LEFT OUTER JOIN LATERAL (SELECT CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_4)s, JSON_BUILD_ARRAY(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_5)s, JSON_BUILD_ARRAY(phone_numbers_1.id)))) AS JSONB) AS _keys, phone_numbers_1.id AS anon, phone_numbers_1.id AS id
FROM public.phone_numbers AS phone_numbers_1
WHERE phone_numbers_1.id = customer_phone_numbers_1.phone_number_id) AS anon_2 ON anon_2.id = customer_phone_numbers_1.phone_number_id GROUP BY customer_phone_numbers_1.customer_id) AS anon_1 ON anon_1.customer_id = customers_1.id
WHERE customers_1.id = %(id_1)s]
[parameters: {'JSON_BUILD_OBJECT_1': 'customer_phone_numbers', 'JSON_BUILD_OBJECT_2': 'customer_id', 'JSON_BUILD_OBJECT_3': 'phone_number_id', 'JSON_BUILD_OBJECT_4': 'phone_numbers', 'JSON_BUILD_OBJECT_5': 'id', 'id_1': 1}]
(Background on this error at: https://sqlalche.me/e/20/f405)
Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1969, in _exec_single_context
    self.dialect.do_execute(
  File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 922, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.UndefinedFunction: operator does not exist: uuid = integer
LINE 6: WHERE customers_1.id = 1
                             ^
HINT:  No operator matches the given name and argument types. You might need to add explicit type casts.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.9/site-packages/pgsync/search_client.py", line 138, in bulk
    self._bulk(
  File "/opt/homebrew/lib/python3.9/site-packages/pgsync/search_client.py", line 194, in _bulk
    for ok, _ in self.parallel_bulk(
  File "/opt/homebrew/lib/python3.9/site-packages/opensearchpy/helpers/actions.py", line 487, in parallel_bulk
    for result in pool.imap(
  File "/opt/homebrew/Cellar/python@3.9/3.9.15/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 870, in next
    raise value
  File "/opt/homebrew/Cellar/python@3.9/3.9.15/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/opt/homebrew/Cellar/python@3.9/3.9.15/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 144, in _helper_reraises_exception
    raise ex
  File "/opt/homebrew/Cellar/python@3.9/3.9.15/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 388, in _guarded_task_generation
    for i, x in enumerate(iterable):
  File "/opt/homebrew/lib/python3.9/site-packages/opensearchpy/helpers/actions.py", line 168, in _chunk_actions
    for action, data in actions:
  File "/opt/homebrew/lib/python3.9/site-packages/pgsync/sync.py", line 924, in _payloads
    yield from self.sync(
  File "/opt/homebrew/lib/python3.9/site-packages/pgsync/sync.py", line 966, in sync
    count: int = self.fetchcount(node._subquery)
  File "/opt/homebrew/lib/python3.9/site-packages/pgsync/base.py", line 895, in fetchcount
    return conn.execute(
  File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1416, in execute
    return meth(
  File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 517, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1639, in _execute_clauseelement
    ret = self._execute_context(
  File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1848, in _execute_context
    return self._exec_single_context(
  File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1988, in _exec_single_context
    self._handle_dbapi_exception(
  File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2344, in _handle_dbapi_exception
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
  File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1969, in _exec_single_context
    self.dialect.do_execute(
  File "/opt/homebrew/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 922, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedFunction) operator does not exist: uuid = integer
LINE 6: WHERE customers_1.id = 1
                             ^
HINT:  No operator matches the given name and argument types. You might need to add explicit type casts.

[SQL: SELECT count(*) AS count_1
FROM public.customers AS customers_1 LEFT OUTER JOIN LATERAL (SELECT JSON_AGG(CAST(anon_2._keys AS JSONB) || CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_1)s, JSON_BUILD_ARRAY(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_2)s, JSON_BUILD_ARRAY(customer_phone_numbers_1.customer_id)), JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_3)s, JSON_BUILD_ARRAY(customer_phone_numbers_1.phone_number_id)))) AS JSONB)) AS _keys, JSON_AGG(anon_2.anon) AS phone_number, customer_phone_numbers_1.customer_id AS customer_id
FROM public.customer_phone_numbers AS customer_phone_numbers_1 LEFT OUTER JOIN LATERAL (SELECT CAST(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_4)s, JSON_BUILD_ARRAY(JSON_BUILD_OBJECT(%(JSON_BUILD_OBJECT_5)s, JSON_BUILD_ARRAY(phone_numbers_1.id)))) AS JSONB) AS _keys, phone_numbers_1.id AS anon, phone_numbers_1.id AS id
FROM public.phone_numbers AS phone_numbers_1
WHERE phone_numbers_1.id = customer_phone_numbers_1.phone_number_id) AS anon_2 ON anon_2.id = customer_phone_numbers_1.phone_number_id GROUP BY customer_phone_numbers_1.customer_id) AS anon_1 ON anon_1.customer_id = customers_1.id
WHERE customers_1.id = %(id_1)s]
[parameters: {'JSON_BUILD_OBJECT_1': 'customer_phone_numbers', 'JSON_BUILD_OBJECT_2': 'customer_id', 'JSON_BUILD_OBJECT_3': 'phone_number_id', 'JSON_BUILD_OBJECT_4': 'phone_numbers', 'JSON_BUILD_OBJECT_5': 'id', 'id_1': 1}]
(Background on this error at: https://sqlalche.me/e/20/f405)
Exception in poll_redis() for thread Thread-17: (psycopg2.errors.UndefinedFunction) operator does not exist: uuid = integer
LINE 6: WHERE customers_1.id = 1
                             ^
HINT:  No operator matches the given name and argument types. You might need to add explicit type casts.

Exiting...
/opt/homebrew/Cellar/python@3.9/3.9.15/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 2 leaked semaphore objects to clean up at shutdown
  warnings.warn('resource_tracker: There appear to be %d '
toluaina commented 5 months ago

Thanks for the report and detailed steps to reproduce. This makes things a whole easier. I've pushed a fix to main branch.

dealertouch commented 5 months ago

Thanks for fixing the issue, how can I build from main branch?

dealertouch commented 5 months ago

@toluaina Tried the main branch and the fix is working. any timeline when this fix will be release?