toluaina / pgsync

Postgres to Elasticsearch/OpenSearch sync
https://pgsync.com
MIT License
1.11k stars 174 forks source link

es query incorrectly using the pg `primary_key` on update operations [elasticsearch.exceptions.RequestError: RequestError] #402

Open nazihkalo opened 1 year ago

nazihkalo commented 1 year ago

PGSync version: 2.4.0

Postgres version: debezium/postgres:15

Elasticsearch version: 7.17.7

Redis version: 7.0

Python version: 3.7

Problem Description:

es query incorrectly using the pg primary_key in the search doc on update operations, even though the primary_key is not referenced in the postgres update command.

Here's a rough skeleton of the schema

{
        "database": "postgres",
        "index": "events",
        "nodes": {
            "table": "Events",
                    "schema": "prd_profile",
                    "columns": [
                        "ID",
                        "State",
                        "Type",
                        "Title"
                    ],
                    "transform":{
                        "mapping": {
                            "eventid":{"type":"keyword"},
                                "State":{"type":"integer"},
                                "Type":{"type":"integer"},
                                "Title":{
                                                        "type": "search_as_you_type",
                                                        "fields": {
                                                            "keyword": {
                                                                "type": "keyword",
                                                                "ignore_above": 256
                                                            }
                                                        }
                                                    }

                        } },
            "children": [
                {   
                    "table": "Organizations",
                    "schema": "prd_profile",
                    "columns": [
                        "ID",
                        "ProfileID",
                        "TwitterID"
                    ],
                    "transform":{
                        "rename":{
                                "ID": "org_id",
                                "ProfileID": "org_profileid",
                                "TwitterID": "org_twitterid"},
                        "mapping": {
                            "orgid":{"type":"keyword"},
                            "org_profileid":{"type":"keyword"},
                            "org_twitterid":{"type":"keyword"}   }}
                    ,
                    "relationship": {
                        "variant": "object",
                        "type": "one_to_one",
                        "foreign_key": {
                            "parent": ["OrgID"],
                            "child": ["ID"]
                        }
                    },
                    "children": [
                        {"table": "OrgProfiles",
                            "schema": "prd_profile",
                            "columns": [
                                "DisplayName",
                                "ProfileID"
                            ],
                            "transform":{
                                "mapping": {
                                    "Sector":{"type":"keyword"},
                                    "ProfileID":{"type":"keyword"},
                                    "ProfilePicture":{"type":"text"}
                                }} ,
                            "relationship": {
                                "variant": "object",
                                "type": "one_to_one",
                                "foreign_key": {
                                    "parent": ["ProfileID"],
                                    "child": ["ProfileID"]
                                }
                            }}
                        ]
                }
            ]
        }
    }

When running this update query:

update prd_profile."OrgProfiles" set "DisplayName"  = 'test'     where "ProfileID" = '5ebbb9fa-9d33-4741-a22d-2a7dc4b95c0c';

elasticsearch dsl query generated tries to map the ProfileID value (which is not the PK) to the DBID value (the PK) 2022-12-16 04:16:22.607:DEBUG:elasticsearch: > {"query":{"bool":{"filter":[{"bool":{"should":[{"terms":{"_meta.Organizations.DBID":["5ebbb9fa-9d33-4741-a22d-2a7dc4b95c0c"]}},{"terms":{"_meta.Organizations.DBID.keyword":["5ebbb9fa-9d33-4741-a22d-2a7dc4b95c0c"]}}]}}]}},"_source":{"excludes":["*"]},"sort":"_doc"}

Error Message (if any):

2022-12-16 04:16:22.607:DEBUG:elasticsearch: > {"query":{"bool":{"filter":[{"bool":{"should":[{"terms":{"_meta.Organizations.DBID":["5ebbb9fa-9d33-4741-a22d-2a7dc4b95c0c"]}},{"terms":{"_meta.Organizations.DBID.keyword":["5ebbb9fa-9d33-4741-a22d-2a7dc4b95c0c"]}}]}}]}},"_source":{"excludes":["*"]},"sort":"_doc"}

2022-12-16 04:16:22.676:ERROR:pgsync.elastichelper: Exception RequestError(400, 'search_phase_execution_exception', 'failed to create query: For input string: "5ebbb9fa-9d33-4741-a22d-2a7dc4b95c0c"')
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/pgsync/elastichelper.py", line 141, in bulk
    raise_on_error=raise_on_error,
  File "/usr/local/lib/python3.7/site-packages/pgsync/elastichelper.py", line 196, in _bulk
    ignore_status=ignore_status,
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/helpers/actions.py", line 484, in parallel_bulk
    actions, chunk_size, max_chunk_bytes, client.transport.serializer
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 748, in next
    raise value
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 140, in _helper_reraises_exception
    raise ex
  File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 292, in _guarded_task_generation
    for i, x in enumerate(iterable):
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/helpers/actions.py", line 155, in _chunk_actions
    for action, data in actions:
  File "/usr/local/lib/python3.7/site-packages/pgsync/sync.py", line 805, in _payloads
    payloads,
  File "/usr/local/lib/python3.7/site-packages/pgsync/sync.py", line 609, in _update_op
    fields,
  File "/usr/local/lib/python3.7/site-packages/pgsync/elastichelper.py", line 232, in _search
    for hit in search.scan():
  File "/usr/local/lib/python3.7/site-packages/elasticsearch_dsl/search.py", line 731, in scan
    for hit in scan(es, query=self.to_dict(), index=self._index, **self._params):
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/helpers/actions.py", line 555, in scan
    body=query, scroll=scroll, size=size, request_timeout=request_timeout, **kwargs
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/utils.py", line 168, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/__init__.py", line 1675, in search
    body=body,
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 415, in perform_request
    raise e
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 388, in perform_request
    timeout=timeout,
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/connection/http_urllib3.py", line 277, in perform_request
    self._raise_error(response.status, raw_data)
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/connection/base.py", line 331, in _raise_error
    status_code, error_message, additional_info
elasticsearch.exceptions.RequestError: RequestError(400, 'search_phase_execution_exception', 'failed to create query: For input string: "5ebbb9fa-9d33-4741-a22d-2a7dc4b95c0c"')
toluaina commented 1 year ago

I think this was reported in #352 and fixed in this commit 3fe7ffeb Can you please try the main branch and confirm if this resolved the issue?