tlocke / pg8000

A Pure-Python PostgreSQL Driver
BSD 3-Clause "New" or "Revised" License
515 stars 46 forks source link

Struct.error: 'h' format requires -32768 <= number <= 32767 #120

Closed thuyetbao closed 2 years ago

thuyetbao commented 2 years ago

Context:

I pushed ~3 million rows using pandas to_sql method to send data to Cloud Postgres.

Functions:

SOURCE_DF.to_sql(
    schema="public",
    name="temp_test",
    con=CONNECTION,
    if_exists="append",
    method="multi",
    index=False,
    chunksize=20000,
)

I have been tested:

The errors have been captured

'h' format requires -32768 <= number <= 32767

More detail

Traceback (most recent call last): File "/workspace/main.py", line 406, in batch_upsert <DATAFRAME>.to_sql( File "/layers/google.python.pip/pip/lib/python3.9/site-packages/pandas/core/generic.py", line 2951, in to_sql return sql.to_sql( File "/layers/google.python.pip/pip/lib/python3.9/site-packages/pandas/io/sql.py", line 697, in to_sql return pandas_sql.to_sql( File "/layers/google.python.pip/pip/lib/python3.9/site-packages/pandas/io/sql.py", line 1739, in to_sql total_inserted = sql_engine.insert_records( File "/layers/google.python.pip/pip/lib/python3.9/site-packages/pandas/io/sql.py", line 1322, in insert_records return table.insert(chunksize=chunksize, method=method) File "/layers/google.python.pip/pip/lib/python3.9/site-packages/pandas/io/sql.py", line 950, in insert num_inserted = exec_insert(conn, keys, chunk_iter) File "/layers/google.python.pip/pip/lib/python3.9/site-packages/pandas/io/sql.py", line 873, in _execute_insert_multi result = conn.execute(stmt) File "/layers/google.python.pip/pip/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1200, in execute return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS) File "/layers/google.python.pip/pip/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 313, in _execute_on_connection return connection._execute_clauseelement( File "/layers/google.python.pip/pip/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1389, in _execute_clauseelement ret = self._execute_context( File "/layers/google.python.pip/pip/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1748, in _execute_context self._handle_dbapi_exception( File "/layers/google.python.pip/pip/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1933, in _handle_dbapi_exception util.raise_(exc_info[1], with_traceback=exc_info[2]) File "/layers/google.python.pip/pip/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 211, in raise_ raise exception File "/layers/google.python.pip/pip/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_context self.dialect.do_execute( File "/layers/google.python.pip/pip/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 691, in do_execute cursor.execute(statement, parameters) File "/layers/google.python.pip/pip/lib/python3.9/site-packages/pg8000/legacy.py", line 254, in execute self._context = self._c.execute_unnamed( File "/layers/google.python.pip/pip/lib/python3.9/site-packages/pg8000/core.py", line 666, in execute_unnamed self.send_PARSE(NULL_BYTE, statement, oids) File "/layers/google.python.pip/pip/lib/python3.9/site-packages/pg8000/core.py", line 640, in send_PARSE val.extend(h_pack(len(oids))) struct.error: 'h' format requires -32768 <= number <= 32767
tlocke commented 2 years ago

Hi @thuyetbao, try using the latest version of pg8000. There was a change in version 1.22.0 https://github.com/tlocke/pg8000#version-1-22-0-2021-10-13 that removed the limitation that you've come across. Let me know if it's still a problem after trying that though.

tlocke commented 2 years ago

Actually, sorry @thuyetbao, I don't think that's the right answer. I'll have another think 🤔

thuyetbao commented 2 years ago

Hi @tlocke, I think I have found and fixed the problems

E.g:

INSERT INTO <table> (<columns>....) VALUES (%s %s %s ...)

But there are limits on the number %s. So when triggered into pg8000 with says, 8 columns with chunksize 20,000 it becomes 160,000 which has reached over 32767 then it raised errors.

So when using the statements, it needs to handle the number of %s.

Let says:

def limit_struct_pack(numb_columns: int) -> int:
    """Limit struct pack records

    The syntax:
    The optimize_chunksize = (Total %s - or limit of struct pack) / Total columns params

    And step down to the mod 100 number.

    Currently, max of pg8000 is 32767 for %s

    Usage:
    >>> limit_struct_pack(4) # 8100
    >>> limit_struct_pack(7) # 4600
    """
    PG800_STRUCT_PACK_LIMIT = 32767
    hard_limit = math.ceil(PG800_STRUCT_PACK_LIMIT / numb_columns)
    util_limit = hard_limit - (hard_limit % 100)
    numb_limit = round(util_limit, -2)
    return numb_limit

I have used that function to limit chunk size to not reach total %s

The equations are:

Total %s = (Limit / Total Columns) = 32767 / Total columns

But when using this, the trade-off is time. In my case:

For 2 mils records, 8 columns -> chunk = 4,000 will be take around 10 minutes.

In pandas document, there are exists the arguments of method

method{None, ‘multi’, callable}, optional
Controls the SQL insertion clause used:

None : Uses standard SQL INSERT clause (one per row).

‘multi’: Pass multiple values in a single INSERT clause.

callable with signature (pd_table, conn, keys, data_iter).

Details and a sample callable implementation can be found in the section [insert method](https://pandas.pydata.org/docs/user_guide/io.html#io-sql-method).

Ref: https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html

So you can implement this in the workaround

# Alternative to_sql() *method* for DBs that support COPY FROM
import csv
from io import StringIO

def psql_insert_copy(table, conn, keys, data_iter):
    """
    Execute SQL statement inserting data

    Parameters
    ----------
    table : pandas.io.sql.SQLTable
    conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
    keys : list of str
        Column names
    data_iter : Iterable that iterates the values to be inserted
    """
    # gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection
    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ', '.join(['"{}"'.format(k) for k in keys])
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
            table_name = table.name

        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
            table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)

Because this is using another approach, it is over the previous method. Snapshot timing in my case is 1 minute.

image

tlocke commented 1 year ago

Glad you solved it. There's another bit of information that may be of help, and that's that SQLAlchemy uses the Cursor.setinputsizes(*sizes) in pg8000, which sets the PostgreSQL types of the parameters of the SQL statement. If you don't call setinputsizes then PostgreSQL will infer the provided types. And the key thing is that it's the length of the list of types that causes the overflow, so if you don't call setinputsizes then you won't hit the limit.