ibis-project / ibis

the portable Python dataframe library
https://ibis-project.org
Apache License 2.0
5.08k stars 585 forks source link

bug(trino): cannot create table with large size data in trino #10178

Open jitingxu1 opened 6 hours ago

jitingxu1 commented 6 hours ago

What happened?

code to reproduce the error:

import ibis
import pyarrow.parquet as pq

t = pq.read_table("~/repos/ibis/ci/ibis-testing-data/parquet/diamonds.parquet")

con = ibis.trino.connect(database=xxx, schema=xxx)

con.create_table("t", t)

~It throws Exception because of MEMORY_LIMIT_EXCEEDED~

 trino.exceptions.TrinoExternalError: TrinoExternalError(type=EXTERNAL, name=MEMORY_LIMIT_EXCEEDED, message="Memory limit [134217728] for memory connector exceeded", query_id=20240918_201010_23660_d9iga

~It is related to the _in_memory_table_exists, I saw we have recently changed the implementation #10067~

smaller data runs OK

t = t.slice(0, 5)
con.create_table("t", t)

I guess this could be the reason of CI failures for #9908 #9744

What version of ibis are you using?

9.5.0

What backend(s) are you using, if any?

Trino

Relevant log output

Cell In[5], line 1
----> 1 con.create_table("t200", t)

File ~/repos/ibis/ibis/backends/trino/__init__.py:495, in Backend.create_table(self, name, obj, schema, database, temp, overwrite, comment, properties)
    492 else:
    493     table = ibis.memtable(obj, schema=schema)
--> 495 self._run_pre_execute_hooks(table)
    497 # cast here because trino doesn't allow specifying a schema in
    498 # CTAS, e.g., `CREATE TABLE (schema) AS SELECT`
    499 select = sg.select(
    500     *(
    501         self.compiler.cast(sg.column(name, quoted=quoted), typ).as_(
   (...)
    505     )
    506 ).from_(self.compiler.to_sqlglot(table).subquery())

File ~/repos/ibis/ibis/backends/__init__.py:1147, in BaseBackend._run_pre_execute_hooks(self, expr)
   1145 """Backend-specific hooks to run before an expression is executed."""
   1146 self._register_udfs(expr)
-> 1147 self._register_in_memory_tables(expr)

File ~/repos/ibis/ibis/backends/__init__.py:1122, in BaseBackend._register_in_memory_tables(self, expr)
   1120 for memtable in expr.op().find(ops.InMemoryTable):
   1121     if not self._in_memory_table_exists(memtable.name):
-> 1122         self._register_in_memory_table(memtable)
   1123         weakref.finalize(
   1124             memtable, self._finalize_in_memory_table, memtable.name
   1125         )

File ~/repos/ibis/ibis/backends/trino/__init__.py:598, in Backend._register_in_memory_table(self, op)
    596 data = op.data.to_frame().itertuples(index=False)
    597 insert_stmt = self._build_insert_template(name, schema=schema)
--> 598 with self.begin() as cur:
    599     cur.execute(create_stmt)
    600     for row in data:

File ~/anaconda3/envs/ibis-dev-arm64/lib/python3.12/contextlib.py:158, in _GeneratorContextManager.__exit__(self, typ, value, traceback)
    156     value = typ()
    157 try:
--> 158     self.gen.throw(value)
    159 except StopIteration as exc:
    160     # Suppress StopIteration *unless* it's the same exception that
    161     # was passed to throw().  This prevents a StopIteration
    162     # raised inside the "with" statement from being suppressed.
    163     return exc is not value

File ~/repos/ibis/ibis/backends/trino/__init__.py:92, in Backend.begin(self)
     90 finally:
     91     if cur._query:
---> 92         cur.close()

File ~/anaconda3/envs/ibis-dev-arm64/lib/python3.12/site-packages/trino/dbapi.py:697, in Cursor.close(self)
    696 def close(self):
--> 697     self.cancel()

File ~/anaconda3/envs/ibis-dev-arm64/lib/python3.12/site-packages/trino/dbapi.py:694, in Cursor.cancel(self)
    692 if self._query is None:
    693     return
--> 694 self._query.cancel()

File ~/anaconda3/envs/ibis-dev-arm64/lib/python3.12/site-packages/trino/client.py:858, in TrinoQuery.cancel(self)
    856     response = self._request.delete(self._next_uri)
    857 except requests.exceptions.RequestException as e:
--> 858     raise trino.exceptions.TrinoConnectionError("failed to cancel query: {}".format(e))
    859 if response.status_code == requests.codes.no_content:
    860     self._cancelled = True

TrinoConnectionError: failed to cancel query: ('Connection aborted.', ConnectionResetError(54, 'Connection reset by peer'))

Code of Conduct

jitingxu1 commented 2 hours ago

did some checks, trino and impala insert data into database one by one, see trino https://github.com/ibis-project/ibis/blob/main/ibis/backends/trino/__init__.py#L592

It becomes slower and slower during insertion...so it takes almost forever to insert the diamond data, which has 40k rows.

I tested insert data by chunk, ~it works better~

data = list(op.data.to_frame().itertuples(index=False))

insert_stmt = self._build_insert_template(name, schema=schema)
with self.begin() as cur:
  cur.execute(create_stmt)
  chunk_size = 100  # Define the chunk size
  for i in range(0, len(data), chunk_size):
      chunk = data[i:i + chunk_size]
      cur.executemany(insert_stmt, chunk)

not sure if we want to insert the data as a whole, it may be out of memory if data size is too large

-----update-----

Iteration 100 took 2.7026 seconds
Iteration 101 took 2.7528 seconds
Iteration 102 took 2.8886 seconds
Iteration 103 took 2.7334 seconds
Iteration 104 took 2.5021 seconds
Iteration 105 took 2.6534 seconds
Iteration 106 took 3.8154 seconds
Iteration 107 took 3.4111 seconds
Iteration 108 took 4.0579 seconds
Iteration 109 took 49.1745 seconds
Iteration 110 took 109.2089 seconds
Iteration 111 took 74.9319 seconds