elastic / connectors

Official Elastic connectors for third-party data sources
https://www.elastic.co/guide/en/elasticsearch/reference/master/es-connectors.html
Other
18 stars 136 forks source link

Postgres connector: temporary file size exceeds temp_file_limit #2680

Closed Dig-Doug closed 2 months ago

Dig-Doug commented 4 months ago

Bug Description

The postgres connector fails to sync after processing a few documents in my database.

To Reproduce

Traceback (most recent call last):
  File "/app/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 482, in _prepare_and_execute
    self._rows = await prepared_stmt.fetch(*parameters)
  File "/app/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 176, in fetch
    data = await self.__bind_execute(args, 0, timeout)
  File "/app/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 241, in __bind_execute
    data, status, _ = await self.__do_execute(
  File "/app/lib/python3.10/site-packages/asyncpg/prepared_stmt.py", line 230, in __do_execute
    return await executor(protocol)
  File "asyncpg/protocol/protocol.pyx", line 201, in bind_execute
asyncpg.exceptions.ConfigurationLimitExceededError: temporary file size exceeds temp_file_limit (1021877kB)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/app/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1964, in _exec_single_context
    self.dialect.do_execute(
  File "/app/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 747, in do_execute
    cursor.execute(statement, parameters)
  File "/app/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 515, in execute
    self._adapt_connection.await_(
  File "/app/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 102, in await_only
    return current.driver.switch(awaitable)  # type: ignore[no-any-return]
  File "/app/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 160, in greenlet_spawn
    value = await result
  File "/app/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 494, in _prepare_and_execute
    self._handle_exception(error)
  File "/app/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 444, in _handle_exception
    self._adapt_connection._handle_exception(error)
  File "/app/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 717, in _handle_exception
    raise translated_error from error
sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_dbapi.Error: <class 'asyncpg.exceptions.ConfigurationLimitExceededError'>: temporary file size exceeds temp_file_limit (1021877kB)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/app/connectors/es/sink.py", line 407, in run
    await self.get_docs(generator)
  File "/app/connectors/es/sink.py", line 458, in get_docs
    async for count, doc in aenumerate(generator):
  File "/app/connectors/utils.py", line 818, in aenumerate
    async for elem in asequence:
  File "/app/connectors/logger.py", line 247, in __anext__
    return await self.gen.__anext__()
  File "/app/connectors/es/sink.py", line 441, in _decorate_with_metrics_span
    async for doc in generator:
  File "/app/connectors/sync_job_runner.py", line 411, in prepare_docs
    async for doc, lazy_download, operation in self.generator():
  File "/app/connectors/sync_job_runner.py", line 447, in generator
    async for doc, lazy_download in self.data_provider.get_docs(
  File "/app/connectors/sources/postgresql.py", line 677, in get_docs
    async for row in self.fetch_documents_from_table(
  File "/app/connectors/sources/postgresql.py", line 525, in fetch_documents_from_table
    async for doc in docs_generator:
  File "/app/connectors/sources/postgresql.py", line 600, in _yield_all_docs_from_tables
    async for row in self.yield_rows_for_query(
  File "/app/connectors/sources/postgresql.py", line 644, in yield_rows_for_query
    async for row in streamer:
  File "/app/connectors/sources/postgresql.py", line 309, in data_streamer
    async for data in fetch(
  File "/app/connectors/sources/generic_database.py", line 110, in fetch
    async for result in _execute():
  File "/app/connectors/utils.py", line 556, in wrapped
    raise e
  File "/app/connectors/utils.py", line 551, in wrapped
    async for item in func(*args, **kwargs):
  File "/app/connectors/sources/generic_database.py", line 90, in _execute
    cursor = await cursor_func()
  File "/app/connectors/sources/postgresql.py", line 201, in get_cursor
    cursor = await connection.execute(text(query))
  File "/app/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/engine.py", line 594, in execute
    result = await greenlet_spawn(
  File "/app/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 165, in greenlet_spawn
    result = context.throw(*sys.exc_info())
  File "/app/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1414, in execute
    return meth(
  File "/app/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 489, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/app/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1638, in _execute_clauseelement
    ret = self._execute_context(
  File "/app/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1842, in _execute_context
    return self._exec_single_context(
  File "/app/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1983, in _exec_single_context
    self._handle_dbapi_exception(
  File "/app/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2325, in _handle_dbapi_exception
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
  File "/app/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1964, in _exec_single_context
    self.dialect.do_execute(
  File "/app/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 747, in do_execute
    cursor.execute(statement, parameters)
  File "/app/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 515, in execute
    self._adapt_connection.await_(
  File "/app/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 102, in await_only
    return current.driver.switch(awaitable)  # type: ignore[no-any-return]
  File "/app/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 160, in greenlet_spawn
    value = await result
  File "/app/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 494, in _prepare_and_execute
    self._handle_exception(error)
  File "/app/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 444, in _handle_exception
    self._adapt_connection._handle_exception(error)
  File "/app/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 717, in _handle_exception
    raise translated_error from error
sqlalchemy.exc.DBAPIError: (sqlalchemy.dialects.postgresql.asyncpg.Error) <class 'asyncpg.exceptions.ConfigurationLimitExceededError'>: temporary file size exceeds temp_file_limit (1021877kB)
[SQL: SELECT * FROM "public"."document_snippets" ORDER BY "id","source_document_id" LIMIT 1000 OFFSET 2000]

Expected behavior

It should not fail. If an individual row cannot be uploaded, it should drop it and keep going.

Environment

Running the connector on docker

Dig-Doug commented 4 months ago

I believe this is actually a symptom of incorrectly ordering the primary keys.

This table has a compound primary key:

CREATE TABLE document_snippets(
  source_document_id text,
  id text,
  PRIMARY KEY (source_document_id, id)
)

But in the error, the components are reversed ORDER BY id, source_document_id which will lead to a very expensive query.

Dig-Doug commented 4 months ago

The primary keys are sorted here, making the order inefficient.

artem-shelkovnikov commented 4 months ago

Hi @Dig-Doug, have you tested if removing sorting makes it work for you?

Dig-Doug commented 4 months ago

Yes, removing the sort allows the connector to start syncing.

For reference, here is a comparison of the costs for misordering the keys:

EXPLAIN SELECT * FROM document_snippets ORDER BY id, source_document_id LIMIT 1000 OFFSET 2000;

Limit  (cost=2440052.09..2440168.76 rows=1000 width=1405)
  ->  Gather Merge  (cost=2439818.74..2679752.32 rows=2056430 width=1405)
        Workers Planned: 2
        ->  Sort  (cost=2438818.72..2441389.25 rows=1028215 width=1405)
              Sort Key: id, source_document_id
              ->  Parallel Seq Scan on document_snippets  (cost=0.00..448904.15 rows=1028215 width=1405)

EXPLAIN SELECT * FROM document_snippets ORDER BY source_document_id,id LIMIT 1000 OFFSET 2000;

Limit  (cost=5294.09..7940.85 rows=1000 width=1405)
  ->  Index Scan using document_snippets_pkey on document_snippets  (cost=0.56..6531468.94 rows=2467717 width=1405)
artem-shelkovnikov commented 4 months ago

If you want you can make a contribution to us with the change you've made. If not, we'll address it but probably a bit later.

Would you like to contribute?