snowflakedb / snowflake-sqlalchemy

Snowflake SQLAlchemy
https://pypi.python.org/pypi/snowflake-sqlalchemy/
Apache License 2.0
230 stars 149 forks source link

SNOW-893080: `session.bulk_save_objects` does not put all objects in one INSERT #441

Open markfickett opened 11 months ago

markfickett commented 11 months ago

Please answer these questions before submitting your issue. Thanks!

  1. What version of Python are you using?

Python 3.10.11 (main, Apr 5 2023, 14:15:10) [GCC 9.4.0]

  1. What operating system and processor architecture are you using?

Linux-5.14.0-1029-oem-x86_64-with-glibc2.31

  1. What are the component versions in the environment (pip freeze)?
alembic==1.10.4
pulumi==3.38.0
pulumi-snowflake==0.28.0
pymssql==2.2.7
PyMySQL==0.9.3
PyYAML==5.3.1
snowflake-connector-python==2.8.3
snowflake-sqlalchemy==1.4.7
SQLAlchemy==1.4.31
  1. What did you do?

Using the SqlAlchemy 1.4 bulk_save_objects API, I added 5000 objects in one SqlAlchemy session and then committed the session.

  1. What did you expect to see?

I expected to see one INSERT with 5000 VALUES rows. Instead, I see a variety of INSERT sizes, from 1 row to ~50 rows, and the insert of 5k objects takes 3+ minutes.

image

adamkipnis-tuskr commented 3 months ago

Just came across this as well. Seems like a pretty big issue...

andymiller-og commented 3 months ago

Has there been any progress on this issue?

sfc-gh-dszmolka commented 3 months ago

hi and thank you for raising this issue. checking with the team to see whether we can address this before sqlalchemy 2.0 support release (which has right now priority). thank you for bearing with us !

sfc-gh-dszmolka commented 3 months ago

my colleague took a look and shared the below example:

Base = declarative_base()

class SampleBulk(Base):
    __tablename__ = "sample_bulk"

    pk = Column(Integer, Sequence('sample_bulk_pk_seq', order=True), primary_key=True)
    name = Column(String(30), )
    amount = Column(Integer, default=0)

    def __repr__(self) -> str:
        return f"SampleBulk(pk={self.pk}, name={self.name}, amount={self.amount})"

def main(engine):
    try:
        Base.metadata.create_all(engine)

        with Session(engine) as session:
            todds = (SampleBulk(name=f"Tod_{i}", amount=i) for i in range(1, 59999))
            session.bulk_save_objects(todds)
            session.commit()

            result = session.query(func.count(SampleBulk.pk)).scalar()
            print(f" *** {result=}")

    finally:
        Base.metadata.drop_all(engine)

if __name__ == "__main__":
    main()

This was able to insert 60k rows in one single command. Also on a side note, to be able to handle NULL in ORM models this example uses Sequence() instead of autoincrement=True, also my colleague found similar problems with generating pk for other dialects besides Snowflake.

All in all; if any of you still has this problem, could you please check with v1.5.1 and see if it works for you ? Let us know please how it went.

adamkipnis-tuskr commented 3 months ago

Thanks for the response @sfc-gh-dszmolka. I ran your test and it did write out all 60k records in a single statement. I'm not seeing the same behavior in our app, though, so I'm trying to put together a test case that reproduces what I'm seeing. The entity I'm working with has about 40 columns and when I submit a batch of ~5k records, it inserts in batches of only about 15 at a time. I'll provide more info on the batch insert once I have a better test to illustrate it.

However, the updates are an even bigger issue. If you modify your test case to include the below snippet, you will see that the updates are happening one at a time:

def main(engine):
    try:
        Base.metadata.create_all(engine)

        with Session(engine) as session:
            todds = (SampleBulk(name=f"Tod_{i}", amount=i) for i in range(1, 59999))
            session.bulk_save_objects(todds)
            session.commit()

            result = session.query(func.count(SampleBulk.pk)).scalar()
            print(f" *** {result=}")

            records = session.query(SampleBulk).all()

            for record in records:
                record.name = f"NewName_{record.pk}"

            session.bulk_save_objects(records)
    finally:
        Base.metadata.drop_all(engine)

Output:

[2024-04-16T11:09:58.284-0700] {cursor.py:1032} INFO - query: [SELECT cm_dwh.sample_bulk.pk AS cm_dwh_sample_bulk_pk, cm_dwh.sample_bulk.name A...]
[2024-04-16T11:10:00.090-0700] {cursor.py:1045} INFO - query execution done
[2024-04-16T11:10:00.090-0700] {cursor.py:1205} INFO - Number of results in first chunk: 464
[2024-04-16T11:10:14.102-0700] {cursor.py:1032} INFO - query: [UPDATE cm_dwh.sample_bulk SET name='NewName_1' WHERE cm_dwh.sample_bulk.pk = 1]
[2024-04-16T11:10:15.770-0700] {cursor.py:1045} INFO - query execution done
[2024-04-16T11:10:15.770-0700] {cursor.py:1032} INFO - query: [UPDATE cm_dwh.sample_bulk SET name='NewName_2' WHERE cm_dwh.sample_bulk.pk = 2]
[2024-04-16T11:10:17.339-0700] {cursor.py:1045} INFO - query execution done
[2024-04-16T11:10:17.339-0700] {cursor.py:1032} INFO - query: [UPDATE cm_dwh.sample_bulk SET name='NewName_3' WHERE cm_dwh.sample_bulk.pk = 3]
[2024-04-16T11:10:18.579-0700] {cursor.py:1045} INFO - query execution done
[2024-04-16T11:10:18.580-0700] {cursor.py:1032} INFO - query: [UPDATE cm_dwh.sample_bulk SET name='NewName_4' WHERE cm_dwh.sample_bulk.pk = 4]
[2024-04-16T11:10:20.257-0700] {cursor.py:1045} INFO - query execution done
[2024-04-16T11:10:20.258-0700] {cursor.py:1032} INFO - query: [UPDATE cm_dwh.sample_bulk SET name='NewName_5' WHERE cm_dwh.sample_bulk.pk = 5]
[2024-04-16T11:10:21.912-0700] {cursor.py:1045} INFO - query execution done
[2024-04-16T11:10:21.912-0700] {cursor.py:1032} INFO - query: [UPDATE cm_dwh.sample_bulk SET name='NewName_6' WHERE cm_dwh.sample_bulk.pk = 6]
[2024-04-16T11:10:22.939-0700] {cursor.py:1045} INFO - query execution done
[2024-04-16T11:10:22.940-0700] {cursor.py:1032} INFO - query: [UPDATE cm_dwh.sample_bulk SET name='NewName_7' WHERE cm_dwh.sample_bulk.pk = 7]
[2024-04-16T11:10:24.029-0700] {cursor.py:1045} INFO - query execution done
[2024-04-16T11:10:24.030-0700] {cursor.py:1032} INFO - query: [UPDATE cm_dwh.sample_bulk SET name='NewName_8' WHERE cm_dwh.sample_bulk.pk = 8]
...

My expectation would be that it should be able to batch the updates into a single query with something like:

update sample_bulk b set name=tmp.name 
from (values (1,'NewName_1'),(2,'NewName_2'),(3,'NewName_3')) as tmp (pk,name)
where b.pk=tmp.pk;
sfc-gh-dszmolka commented 3 months ago

Thanks for the repro, we're taking a look.

However this issue started to branch out a little from the original session.bulk_save_objects does not put all objects in one INSERT - we have a working example how to achieve this goal.

Also considering you now have problems with the UPDATE operation and in parallel you do have an official Support case ongoing with Snowflake Support, do you think this Issue on github can be closed out, since the original issue is addressed and you have ongoing help with the UPDATE in the Support case ? (which anyways has strict SLA's, etc. in contrary to issues here)

Once the UPDATE issue is resolved, I can of course retroactively update this issue with the solution to inform anyone else stumbling across this issue.

adamkipnis-tuskr commented 3 months ago

@sfc-gh-dszmolka : I have a reproducible example. The issue is if an entity has optional/nullable columns. Here's an example where inserts are not done in bulk:

Base = declarative_base()

class SampleBulk(Base):
    __tablename__ = "sample_bulk"

    pk = Column(Integer, Sequence('sample_bulk_pk_seq', order=True), primary_key=True)
    name = Column(String(30), )
    amount = Column(Integer, default=0)
    col1 = Column(String(4000))
    col2 = Column(String(4000))
    col3 = Column(String(4000))

    def __repr__(self) -> str:
        return f"SampleBulk(pk={self.pk}, name={self.name}, amount={self.amount})"

def main(engine):
    try:
        Base.metadata.create_all(engine)

        with Session(engine) as session:
            todds = []
            for i in range(1, 10000):
                d = {
                    "pk": i,
                    "name": f"Tod_{i}",
                    "amount": i,
                }

                for col in ['col1', 'col2', 'col3']:
                    if bool(random.getrandbits(1)):
                        d[col] = f"{col}_{i}"

                todds.append(SampleBulk(**d))

            session.bulk_save_objects(todds, update_changed_only=False, return_defaults=False, preserve_order=False)
            session.commit()

            result = session.query(func.count(SampleBulk.pk)).scalar()
            print(f" *** {result=}")
    finally:
        Base.metadata.drop_all(engine)

if __name__ == "__main__":
    main()

Here's snippet of the output:

[2024-04-19T13:31:54.682-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col2) VALUES (1, 'Tod_1', 1, '...]
[2024-04-19T13:31:55.150-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:55.152-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col1) VALUES (2, 'Tod_2', 2, '...]
[2024-04-19T13:31:55.576-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:55.577-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col1, col3) VALUES (3, 'Tod_3'...]
[2024-04-19T13:31:56.063-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:56.064-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col3) VALUES (4, 'Tod_4', 4, '...]
[2024-04-19T13:31:56.510-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:56.511-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount) VALUES (5, 'Tod_5', 5)]
[2024-04-19T13:31:56.874-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:56.876-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col2) VALUES (6, 'Tod_6', 6, '...]
[2024-04-19T13:31:57.357-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:57.358-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col1, col2) VALUES (7, 'Tod_7'...]
[2024-04-19T13:31:57.820-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:57.821-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col1) VALUES (8, 'Tod_8', 8, '...]
[2024-04-19T13:31:58.240-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:58.241-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col1, col3) VALUES (9, 'Tod_9'...]
[2024-04-19T13:31:58.695-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:58.696-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col1) VALUES (10, 'Tod_10', 10...]
[2024-04-19T13:31:59.341-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:31:59.342-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col3) VALUES (11, 'Tod_11', 11...]
[2024-04-19T13:32:00.013-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:32:00.015-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col1, col2, col3) VALUES (12, ...]
[2024-04-19T13:32:00.406-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:32:00.407-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col1, col3) VALUES (13, 'Tod_1...]
[2024-04-19T13:32:00.828-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:32:00.829-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col2) VALUES (14, 'Tod_14', 14...]
[2024-04-19T13:32:01.287-0700] {cursor.py:1045} INFO - query execution done
[2024-04-19T13:32:01.287-0700] {cursor.py:1032} INFO - query: [INSERT INTO cm_dwh.sample_bulk (pk, name, amount, col1, col3) VALUES (15, 'Tod_1...]
[2024-04-19T13:32:01.683-0700] {cursor.py:1045} INFO - query execution done

It appears that the inserts will only be batched if there are consecutive objects added to the list with the same set of columns populated.

EDIT-Editing to add that I ran this test with preserve_order=False (I originally pasted the wrong version of the code snippet). If that had been set to True, I would understand why the inserts aren't being batched. But this argument is specifically for this case where heterogenous objects can be added in any order and then re-ordered to allow for batching.

sfc-gh-dszmolka commented 3 months ago

thank you for adding this repro for 'not batching INSERTs' issue (issue handled in this issue441).

Out of curiosity; I tried running the reproduction without Snowflake, using a postgres instance:

# cat testpg.py 
from sqlalchemy import create_engine, Column, String, Integer, Sequence, func
from sqlalchemy.engine import URL
from sqlalchemy.orm import declarative_base, Session

import random

url = URL.create(
    drivername="postgresql",
    username="postgres",
    host="/var/run/postgresql",
    database="test_db"
)

engine = create_engine(url, echo=True)

Base = declarative_base()

class SampleBulk(Base):
    __tablename__ = "sample_bulk"

    pk = Column(Integer, Sequence('sample_bulk_pk_seq', order=True), primary_key=True)
    name = Column(String(30), )
    amount = Column(Integer, default=0)
    col1 = Column(String(4000))
    col2 = Column(String(4000))
    col3 = Column(String(4000))

    def __repr__(self) -> str:
        return f"SampleBulk(pk={self.pk}, name={self.name}, amount={self.amount})"

def main(engine):
    try:
        Base.metadata.create_all(engine)

        with Session(engine) as session:
            todds = []
            for i in range(1, 10000):
                d = {
                    "pk": i,
                    "name": f"Tod_{i}",
                    "amount": i,
                }

                for col in ['col1', 'col2', 'col3']:
                    if bool(random.getrandbits(1)):
                        d[col] = f"{col}_{i}"

                todds.append(SampleBulk(**d))

            ### tried with both the defaults and both from your repro, INSERTs still come one at a time
            session.bulk_save_objects(todds, update_changed_only=False, return_defaults=False, preserve_order=False)
            #session.bulk_save_objects(todds)
            session.commit()

            result = session.query(func.count(SampleBulk.pk)).scalar()
            print(f" *** {result=}")
    finally:
        Base.metadata.drop_all(engine)

if __name__ == "__main__":
    main(engine)

Result:

..
2024-04-23 07:48:51,485 INFO sqlalchemy.engine.Engine COMMIT
2024-04-23 07:48:51,596 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-04-23 07:48:51,596 INFO sqlalchemy.engine.Engine INSERT INTO sample_bulk (pk, name, amount, col1) VALUES (%(pk)s, %(name)s, %(amount)s, %(col1)s)
2024-04-23 07:48:51,596 INFO sqlalchemy.engine.Engine [generated in 0.00015s] {'pk': 1, 'name': 'Tod_1', 'amount': 1, 'col1': 'col1_1'}
2024-04-23 07:48:51,597 INFO sqlalchemy.engine.Engine INSERT INTO sample_bulk (pk, name, amount, col1, col3) VALUES (%(pk)s, %(name)s, %(amount)s, %(col1)s, %(col3)s)
2024-04-23 07:48:51,597 INFO sqlalchemy.engine.Engine [generated in 0.00011s] {'pk': 2, 'name': 'Tod_2', 'amount': 2, 'col1': 'col1_2', 'col3': 'col3_2'}
2024-04-23 07:48:51,598 INFO sqlalchemy.engine.Engine INSERT INTO sample_bulk (pk, name, amount) VALUES (%(pk)s, %(name)s, %(amount)s)
2024-04-23 07:48:51,598 INFO sqlalchemy.engine.Engine [generated in 0.00008s] {'pk': 3, 'name': 'Tod_3', 'amount': 3}
2024-04-23 07:48:51,598 INFO sqlalchemy.engine.Engine INSERT INTO sample_bulk (pk, name, amount, col1, col3) VALUES (%(pk)s, %(name)s, %(amount)s, %(col1)s, %(col3)s)
2024-04-23 07:48:51,598 INFO sqlalchemy.engine.Engine [cached since 0.001002s ago] {'pk': 4, 'name': 'Tod_4', 'amount': 4, 'col1': 'col1_4', 'col3': 'col3_4'}
2024-04-23 07:48:51,599 INFO sqlalchemy.engine.Engine INSERT INTO sample_bulk (pk, name, amount, col2) VALUES (%(pk)s, %(name)s, %(amount)s, %(col2)s)
2024-04-23 07:48:51,599 INFO sqlalchemy.engine.Engine [generated in 0.00009s] {'pk': 5, 'name': 'Tod_5', 'amount': 5, 'col2': 'col2_5'}
..

every INSERT is sent individually, which to me suggests that this issue (related to nullable/optional columns) is not related to the Snowflake dialect and snowflake-sqlalchemy, as same issue reproduces without Snowflake. Perhaps it could be reported with https://github.com/sqlalchemy/sqlalchemy/issues

For the issue with the UPDATE, if further troubleshooting is required, it would be greatly appreciated if you could please open a separate Issue here.

sfc-gh-dszmolka commented 3 months ago

my colleague also found the following interesting Stackoverflow post : https://stackoverflow.com/questions/48874745/sqlalchemy-bulk-insert-mappings-generates-a-large-number-of-insert-batches-is-t

Looks to be very relevant to your use-case and has possible solutions as well, please take a look once you get a chance. Has to do with how the input data is structured, rather than any Snowflake aspect.

adamkipnis-tuskr commented 3 months ago

my colleague also found the following interesting Stackoverflow post : https://stackoverflow.com/questions/48874745/sqlalchemy-bulk-insert-mappings-generates-a-large-number-of-insert-batches-is-t

Looks to be very relevant to your use-case and has possible solutions as well, please take a look once you get a chance. Has to do with how the input data is structured, rather than any Snowflake aspect.

Yes, I'm aware of that SO post. That is the method I used to work around the batching issue when using bulk_insert_mappings. This case here is with using bulk_save_objects. The way I'm currently working around the issue described in this ticket is to create a temp table, use bulk_insert_mapping to save my new/modified entities (which also doesn't seem to optimize batching even with render_nulls=True), then use MergeInto. It's not ideal.

sfc-gh-dszmolka commented 3 months ago

I can very much agree that it's not ideal, but also a bit baffled by the fact that same behaviour of bulk_save_objects is happening without Snowflake too, so behaviour we're seeing does not seem to come from Snowflake / snowflake-sqlalchemy.

During my tests with Postgres, I also tried using SQLAlchemy 2.0 instead 1.4.52, result was similar and INSERTs are coming one by one or in a maximum of 2-3 sized batches.

Do you know perhaps of any dialect with which your reproduction works as expected, i.e. sends the generated 10k rows in a single INSERT ?

sfc-gh-dszmolka commented 2 months ago

Since the behaviour does not seem to originate from snowflake-sqlalchemy, we'll be looking at this as an enhancement request. At this time all priorities and resources are allocated to releasing the version of the connector supporting SQLAlchemy 2.0 so at this time I'm unfortunately unable to attach any timeline for implementing the enhancement. Should any of this change, I'll update this thread.

In the meantime please reach out to your Snowflake Account Team and let them know how having this improvement would be important for your use-case - this might put additional traction on the request and might help with reprioritizing.