toluaina / pgsync

Postgres to Elasticsearch/OpenSearch sync
https://pgsync.com
MIT License
1.17k stars 180 forks source link

Using Streaming Bulk option, stops the script on error, can we continue the script despite error? #217

Open keyur1sst opened 2 years ago

keyur1sst commented 2 years ago

PGSync version: 2.1.9

Postgres version: 12.7

Elasticsearch version: Using Opensearch Version 1.0 hosted on AWS

Redis version: 5.0.7

Python version: 3.8.10

Problem Description: I am running Pgsync in Daemon mode (pgsync --config /optional/path/to/schema.json --daemon) using ELASTICSEARCH_STREAMING_BULK=true. The issue I am facing is that the script crashes after 429 error, can we make script to run without failing like without stream mode. I have 300Million record to sync to OpenSearch so running without Stream would not be an option, so can you make script not stop running when we have 429 Too Many Requests or any error.

Error Message (if any):

pgsync --config setting.json --daemon
2021-12-24 18:09:57.731:WARNING:pgsync.utils: ModuleNotFoundError: No module named 'plugins'
2021-12-24 18:09:57.736:WARNING:pgsync.utils: ModuleNotFoundError: No module named 'plugins'
2021-12-24 18:09:58.034:WARNING:pgsync.utils: ModuleNotFoundError: No module named 'plugins'
 - contacts
  [--------------------------------------------------]  196001/311609941    0%  1d 20:37:352021-12-24 18:13:29.642:WARNING:elasticsearch: POST https://search-whocalledbaharin-hteym33qwdkixddps47vmj6uta.me-south-1.es.amazonaws.com:443/whocalled/_bulk?refresh=false [status:429 request:0.055s]
2021-12-24 18:13:29.643:WARNING:elasticsearch: Undecodable raw error response from server: Extra data: line 1 column 5 - line 1 column 39 (char 4 - 38)
2021-12-24 18:13:29.643:ERROR:pgsync.sync: Exception TransportError(429, '429 Too Many Requests /whocalled/_bulk')
Traceback (most recent call last):
  File "/home/sst/.local/lib/python3.8/site-packages/pgsync/sync.py", line 875, in sync
    self.es.bulk(self.index, docs)
  File "/home/sst/.local/lib/python3.8/site-packages/pgsync/elastichelper.py", line 107, in bulk
    for _ in helpers.streaming_bulk(
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/helpers/actions.py", line 329, in streaming_bulk
    for data, (ok, info) in zip(
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/helpers/actions.py", line 256, in _process_bulk_chunk
    for item in gen:
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/helpers/actions.py", line 195, in _process_bulk_chunk_error
    raise error
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/helpers/actions.py", line 240, in _process_bulk_chunk
    resp = client.bulk("\n".join(bulk_actions) + "\n", *args, **kwargs)
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/client/utils.py", line 168, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/client/__init__.py", line 463, in bulk
    return self.transport.perform_request(
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/transport.py", line 415, in perform_request
    raise e
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/transport.py", line 381, in perform_request
    status, headers_response, data = connection.perform_request(
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/connection/http_urllib3.py", line 277, in perform_request
    self._raise_error(response.status, raw_data)
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/connection/base.py", line 330, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(
elasticsearch.exceptions.TransportError: TransportError(429, '429 Too Many Requests /whocalled/_bulk')
 0:03:31.913506 (211.91 sec)
Traceback (most recent call last):
  File "/home/sst/.local/bin/pgsync", line 7, in <module>
    sync.main()
  File "/home/sst/.local/lib/python3.8/site-packages/click/core.py", line 1128, in __call__
    return self.main(*args, **kwargs)
  File "/home/sst/.local/lib/python3.8/site-packages/click/core.py", line 1053, in main
    rv = self.invoke(ctx)
  File "/home/sst/.local/lib/python3.8/site-packages/click/core.py", line 1395, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/sst/.local/lib/python3.8/site-packages/click/core.py", line 754, in invoke
    return __callback(*args, **kwargs)
  File "/home/sst/.local/lib/python3.8/site-packages/pgsync/sync.py", line 1139, in main
    sync.pull()
  File "/home/sst/.local/lib/python3.8/site-packages/pgsync/sync.py", line 1006, in pull
    self.sync(self._sync(txmin=txmin, txmax=txmax))
  File "/home/sst/.local/lib/python3.8/site-packages/pgsync/sync.py", line 875, in sync
    self.es.bulk(self.index, docs)
  File "/home/sst/.local/lib/python3.8/site-packages/pgsync/elastichelper.py", line 107, in bulk
    for _ in helpers.streaming_bulk(
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/helpers/actions.py", line 329, in streaming_bulk
    for data, (ok, info) in zip(
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/helpers/actions.py", line 256, in _process_bulk_chunk
    for item in gen:
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/helpers/actions.py", line 195, in _process_bulk_chunk_error
    raise error
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/helpers/actions.py", line 240, in _process_bulk_chunk
    resp = client.bulk("\n".join(bulk_actions) + "\n", *args, **kwargs)
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/client/utils.py", line 168, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/client/__init__.py", line 463, in bulk
    return self.transport.perform_request(
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/transport.py", line 415, in perform_request
    raise e
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/transport.py", line 381, in perform_request
    status, headers_response, data = connection.perform_request(
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/connection/http_urllib3.py", line 277, in perform_request
    self._raise_error(response.status, raw_data)
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/connection/base.py", line 330, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(
elasticsearch.exceptions.TransportError: TransportError(429, '429 Too Many Requests /whocalled/_bulk')
toluaina commented 2 years ago

If you are getting 429 errors then you can ajdust the params

The defaults are listed here

voyc-jean commented 2 years ago

@keyur1sst what Opensearch instance type are you using? Perhaps see my answer here.