googleapis / python-bigquery

Apache License 2.0
746 stars 306 forks source link

Add DBAPI executemany INSERT batching support #2048

Open jlynchMicron opened 3 weeks ago

jlynchMicron commented 3 weeks ago

Is your feature request related to a problem? Please describe. It appears that the BigQuery DBAPI does not support multi-row INSERT batching for more performant python-based DML transactions. Current executemany INSERT statements are executed one at a time, leading to massive slowdowns in batch INSERT DML operations.

Example multi-row insert from BiqQuery documentation: https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#insert_examples

INSERT dataset.Inventory (product, quantity)
VALUES('top load washer', 10),
      ('front load washer', 20),
      ('dryer', 30),
      ('refrigerator', 10),
      ('microwave', 20),
      ('dishwasher', 30),
      ('oven', 5)

  Describe the solution you'd like Add multi-row INSERT batching support. MySQL DBAPI example: https://github.com/PyMySQL/PyMySQL/blob/main/pymysql/cursors.py#L194

Describe alternatives you've considered I will probably crudely make a patch to my sqlalchemy-bigquery DBAPI cursor to enable this support for my project that needs this performance boost for my ORM based application.

Additional context sqlalchemy-bigquery related ticket: https://github.com/googleapis/python-bigquery-sqlalchemy/issues/497 sqlalchemy related discussion: https://github.com/sqlalchemy/sqlalchemy/discussions/12038

jlynchMicron commented 3 weeks ago

I just made a crude version of what this could look like from within the sqlalchemy-bigquery dialect "do_executemany" function that seems to be working. I have not implemented any paging to handle query character length limits yet however:

#Hotfix for this issue: https://github.com/googleapis/python-bigquery/issues/2048

import copy
import re
from typing import List

from google.cloud.bigquery import dbapi as bq_dbapi
from sqlalchemy_bigquery.base import BigQueryDialect

def do_executemany(self, cursor, statement:str, parameters:List[dict], context=None):
    #NOTE: Borrowed from MySQL DBAPI: https://github.com/PyMySQL/PyMySQL/blob/main/pymysql/cursors.py#L157
    if not parameters:
        return

    #: Regular expression for :meth:`Cursor.executemany`.
    #: executemany only supports simple bulk insert.
    #: You can use it to load large dataset.
    RE_INSERT_VALUES = re.compile(
        r"\s*((?:INSERT|REPLACE)\b.+\bVALUES?\s*)"
        + r"(\(\s*(?:%s|%\(.+\)s)\s*(?:,\s*(?:%s|%\(.+\)s)\s*)*\))"
        + r"(\s*(?:ON DUPLICATE.*)?);?\s*\Z",
        re.IGNORECASE | re.DOTALL,
    )

    m = RE_INSERT_VALUES.match(statement)

    if m:
        q_prefix = m.group(1) % ()
        q_values = m.group(2).rstrip()
        q_postfix = m.group(3) or ""
        assert q_values[0] == "(" and q_values[-1] == ")"

        new_sql_stmt = q_prefix
        new_sql_params = {}
        set_idx = 0
        for param_set in parameters:
            #formatted_operation, parameter_types = bq_dbapi.cursor._format_operation(statement, param_set)
            #query_parameters = bq_dbapi._helpers.to_query_parameters(param_set, parameter_types)

            set_params = {}
            set_values = copy.copy(q_values)
            for param_k,param_v in param_set.items():
                new_param_k = f'{param_k}__{set_idx}'
                set_params[new_param_k] = param_v
                set_values = set_values.replace(f'%({param_k}:', f'%({new_param_k}:')

            new_sql_stmt = f'{new_sql_stmt}{set_values},'
            new_sql_params.update(set_params)
            set_idx += 1

        new_sql_stmt = new_sql_stmt[:-1] # remove trailing comma
        new_sql_stmt = f'{new_sql_stmt}{q_postfix}'

        rowcount = cursor.execute(new_sql_stmt, new_sql_params)

    else:
        #Current implementation of this function only supports serial inserting.
        rowcount = cursor.executemany(statement, parameters)

BigQueryDialect.do_executemany = do_executemany
jlynchMicron commented 3 weeks ago

Looks like I immediately hit issues with not batching this function in chucks of 10k parameter sets due to this BigQuery quota: "A GoogleSQL query can have up to 10,000 parameters". Looks like I may need to implement that after all 😅