snowflakedb / snowpark-python

Snowflake Snowpark Python API
Apache License 2.0
255 stars 106 forks source link

SNOW-656406: `table.merge()` tries to reference a Snowpark Temp Table that does not exist #470

Closed dnxie12 closed 1 year ago

dnxie12 commented 2 years ago

Been experiencing this issue when trying to write a MERGE operation using Snowpark. Thanks for taking a look!

  1. What version of Python are you using?
python --version
Python 3.8.13
  1. What operating system and processor architecture are you using?
python -c 'import platform; print(platform.platform())'
macOS-12.2-arm64-arm-64bit
  1. What are the component versions in the environment (pip freeze)?

Note: current version of Snowflake-Snowpark is 0.9.0 but have also been experiencing the same issue in 0.7.0 and 0.8.0

agate==1.6.3
alembic==1.6.5
aniso8601==7.0.0
anyio==3.6.1
asn1crypto==1.5.1
attrs==22.1.0
Babel==2.10.3
beautifulsoup4==4.11.1
black==22.6.0
bleach==5.0.1
boto3==1.24.66
botocore==1.27.66
braintree==4.16.0
certifi==2022.6.15
cffi==1.15.1
charset-normalizer==2.1.1
click==8.1.3
cloudpickle==2.0.0
colorama==0.4.5
coloredlogs==14.0
commonmark==0.9.1
coverage==6.4.4
croniter==1.3.5
cryptography==36.0.2
dagit==0.15.9
dagster==0.15.9
dagster-aws==0.15.9
dagster-cloud==0.15.9
dagster-cloud-cli==0.15.9
dagster-dbt==0.15.9
dagster-graphql==0.15.9
dagster-pandas==0.15.9
-e git+ssh://git@github.com/dribbble/data-sdk.git@20e06af78f1c389a62c2247eae6ecab9adb52133#egg=data_sdk
dbt-core==1.2.0
dbt-extractor==0.4.1
dbt-junitxml==0.1.5
dbt-snowflake==1.2.0
defusedxml==0.7.1
Deprecated==1.2.13
docstring-parser==0.14.1
elasticsearch==7.17.6
elasticsearch-dsl==7.4.0
entrypoints==0.4
eventbrite==3.3.5
fastjsonschema==2.16.1
filelock==3.8.0
flatdict==4.0.1
future==0.18.2
gql==2.0.0
graphene==2.1.9
graphql-core==2.3.2
graphql-relay==2.0.1
grpcio==1.48.1
grpcio-health-checking==1.43.0
h11==0.13.0
hologram==0.0.15
httptools==0.4.0
humanfriendly==10.0
idna==3.3
importlib-metadata==4.12.0
iniconfig==1.1.1
isodate==0.6.1
jaraco.classes==3.2.2
Jinja2==2.11.3
jmespath==1.0.1
jsonschema==3.2.0
junit-xml==1.9
jupyter-core==4.11.1
jupyter_client==7.3.5
jupyterlab-pygments==0.2.2
keyring==23.9.0
leather==0.3.4
Logbook==1.5.3
Mako==1.2.2
MarkupSafe==2.0.1
mashumaro==2.9
minimal-snowplow-tracker==0.0.2
mistune==0.8.4
more-itertools==8.14.0
msgpack==1.0.4
mypy-extensions==0.4.3
nbclient==0.5.13
nbconvert==6.4.5
nbformat==5.4.0
nest-asyncio==1.5.5
networkx==2.8.6
numpy==1.23.2
oscrypto==1.3.0
packaging==21.3
pandas==1.4.4
pandocfilters==1.5.0
parsedatetime==2.4
pathspec==0.9.0
pendulum==2.1.2
pep562==1.1
platformdirs==2.5.2
pluggy==1.0.0
promise==2.3
prompt-toolkit==3.0.31
protobuf==3.20.1
psycopg2==2.9.3
py==1.11.0
pycparser==2.21
pycryptodomex==3.15.0
PyGithub @ git+https://github.com/dribbble/PyGithub@538b30b5814fa5093b3785ed022ec58cc2e86bf0
Pygments==2.13.0
PyJWT==2.4.0
PyNaCl==1.5.0
pyOpenSSL==22.0.0
pyparsing==3.0.9
pyrsistent==0.18.1
pytest==7.1.2
pytest-cov==3.0.0
python-dateutil==2.8.2
python-decouple==3.6
python-dotenv==0.21.0
python-editor==1.0.4
python-slugify==6.1.2
pytimeparse==1.1.8
pytz==2022.2.1
pytzdata==2020.1
PyYAML==6.0
pyzmq==23.2.1
questionary==1.10.0
requests==2.28.1
rich==12.5.1
Rx==1.6.1
s3transfer==0.6.0
shellingham==1.5.0
six==1.16.0
sniffio==1.3.0
snowflake-connector-python==2.7.12
snowflake-snowpark-python==0.9.0
soupsieve==2.3.2.post1
SQLAlchemy==1.4.40
sqlparse==0.4.2
starlette==0.20.4
tabulate==0.8.10
testpath==0.6.0
text-unidecode==1.3
tomli==2.0.1
toposort==1.7
tornado==6.2
tqdm==4.64.1
traitlets==5.3.0
typer==0.6.1
typing-compat==0.1.0
typing_extensions==4.3.0
urllib3==1.26.12
uvicorn==0.18.3
uvloop==0.16.0
watchdog==2.1.9
watchfiles==0.16.1
wcwidth==0.2.5
webencodings==0.5.1
websockets==10.3
Werkzeug==2.1.2
wrapt==1.14.1
zipp==3.8.1
  1. What did you do?

I tried to run the following (target being a Snowpark Table and source being a Snowpark DataFrame)

merge_mapping = {v: source[v] for v in source.columns}

target.merge(
    source,
    target["id"] == source["id"],
    [
        when_matched().update(merge_mapping),
        when_not_matched().insert(merge_mapping),
    ],
)
  1. What did you expect to see?

I expected the statement to merge source into target, using an update + insert strategy based on whether a record was matched or not matched according to the id field.

What I got was an error that SNOWPARK_TEMP_TABLE_X does not exist or is not authorized (X being a random alphanumeric string of length 10). I noticed that the error only occurred when there were more than a few rows (e.g. > 10) in the source DataFrame. Running this statement in tests with <= 10 rows in the source DataFrame produced the expected result.

Full stack trace below:

snowflake.snowpark.exceptions.SnowparkSQLException: (1304): 002003 (42S02): SQL compilation error:
Object 'SNOWPARK_TEMP_TABLE_4G6YDVDRWU' does not exist or not authorized.
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
    yield
  File "/usr/local/lib/python3.8/site-packages/dagster/_utils/__init__.py", line 421, in iterate_with_context
    next_output = next(iterator)
  File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/plan/execute_step.py", line 554, in _gen_fn
    gen_output = output_manager.handle_output(output_context, output.value)
  File "/opt/dagster/dagster_home/dagster_analytics/dagster_analytics/io_managers/snowpark.py", line 111, in handle_output
    merged_rows = table.merge(
  File "/usr/local/lib/python3.8/site-packages/snowflake/snowpark/_internal/telemetry.py", line 76, in wrap
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/snowflake/snowpark/table.py", line 506, in merge
    new_df._internal_collect_with_tag(),
  File "/usr/local/lib/python3.8/site-packages/snowflake/snowpark/dataframe.py", line 446, in _internal_collect_with_tag
    return self._session._conn.execute(
  File "/usr/local/lib/python3.8/site-packages/snowflake/snowpark/_internal/server_connection.py", line 368, in execute
    result_set, result_meta = self.get_result_set(
  File "/usr/local/lib/python3.8/site-packages/snowflake/snowpark/_internal/analyzer/snowflake_plan.py", line 149, in wrap
    raise ne.with_traceback(tb) from None
  File "/usr/local/lib/python3.8/site-packages/snowflake/snowpark/_internal/analyzer/snowflake_plan.py", line 82, in wrap
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/snowflake/snowpark/_internal/server_connection.py", line 405, in get_result_set
    result = self.run_query(
  File "/usr/local/lib/python3.8/site-packages/snowflake/snowpark/_internal/server_connection.py", line 103, in wrap
    raise ex
  File "/usr/local/lib/python3.8/site-packages/snowflake/snowpark/_internal/server_connection.py", line 97, in wrap
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/snowflake/snowpark/_internal/server_connection.py", line 325, in run_query
    raise ex
  File "/usr/local/lib/python3.8/site-packages/snowflake/snowpark/_internal/server_connection.py", line 317, in run_query
    results_cursor = self._cursor.execute(query, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/snowflake/connector/cursor.py", line 804, in execute
    Error.errorhandler_wrapper(self.connection, self, error_class, errvalue)
  File "/usr/local/lib/python3.8/site-packages/snowflake/connector/errors.py", line 276, in errorhandler_wrapper
    handed_over = Error.hand_to_other_handler(
  File "/usr/local/lib/python3.8/site-packages/snowflake/connector/errors.py", line 331, in hand_to_other_handler
    cursor.errorhandler(connection, cursor, error_class, error_value)
  File "/usr/local/lib/python3.8/site-packages/snowflake/connector/errors.py", line 210, in default_errorhandler
    raise error_class(
  1. Can you set logging to DEBUG and collect the logs?

I can do this if necessary, can also provide the compiled SQL statements that Snowflake tried to run.

sfc-gh-jdu commented 2 years ago

Hey @dnxie12 , how are yoursource and target created? Sorry I'm not able to tell what's wrong from this stackrace, could you provide a minimum repro for it?

dnxie12 commented 2 years ago

Hey @sfc-gh-jdu, here is some more information:

Using the script below to generate target and source:

from snowflake.snowpark import Session
from snowflake.snowpark.functions import when_matched, when_not_matched
import os
from datetime import datetime

configuration = {
        "account": os.getenv("SNOWFLAKE_ACCOUNT"),
        "user": os.getenv("SNOWFLAKE_USER"),
        "password": os.getenv("SNOWFLAKE_PASSWORD"),
        "database": os.getenv("SNOWFLAKE_DATABASE"),
        "role": os.getenv("SNOWFLAKE_ROLE"),
        "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
        "schema": "ANALYTICS_STAGING",
}
session = Session.builder.configs(configuration).create()

target_df = session.create_dataframe([[10, "old", "John", "John", "USA", "active", 45, datetime(2022, 1, 1)], 
                                      [11, "too_old", "John", "John", "USA", "active", 75, datetime(2022, 1, 1)], 
                                      [12, "old", "John", "John", "USA", "active", 45, datetime(2022, 1, 1)]], 
                                      schema=["id", "type", "name", "last_name", "country", "status", "age", "created_at"])

target_df.write.save_as_table("my_table", mode="overwrite")

target = session.table("my_table")

def generate_records(num_records: int):
    results = []

    for i in range(1, num_records + 1):
        record = [i, "new", "john", "smith", "Canada", "active", 35, datetime(2022, 1, 1)]
        results.append(record)

    return results

source = session.create_dataframe(generate_records(num_records=100), schema=["id", "type", "name", "last_name", "country", "status", "age", "created_at"])

merge_mapping = {v: source[v] for v in source.columns}

target.merge(source, target["id"] == source["id"],
                  [when_matched().update(merge_mapping), when_not_matched().insert(merge_mapping )])

When I use num_records = 10 everything works and the translated Snowflake query is:

image

But when I use num_records>=70, I get the SNOWPARK_TEMP_TABLE error and the translated Snowflake query is:

image

Main difference I see between the 2 screenshots is that the second one uses a SELECT * FROM SNOWPARK_TEMP_TABLE instead of listing out values individually.

dnxie12 commented 1 year ago

hi @sfc-gh-jdu just a quick follow-up to confirm that the min repro provided also errors out on your side and/or if more context is needed? Thanks!

sfc-gh-jdu commented 1 year ago

@dnxie12 ack it's a bug and we'll look into it next week. Thanks for bringing it up!

You got two different translated queries because the underlying data size of source is different. When it's large, we will create a temp table first, insert data into it and load this table in Snowpark. One workaround is to set a larger value for the threshold:

from snowflake.snowpark._internal.analyzer import analyzer
analyzer.ARRAY_BIND_THRESHOLD = <a larger value>

The default value is 512. You can set a larger value though the performance might not be good in this way because we embed the value into SQL text.

dnxie12 commented 1 year ago

That makes sense, thanks for looking into this and for this extra info!