snowflakedb / snowpark-python

Snowflake Snowpark Python API
Apache License 2.0
256 stars 106 forks source link

SNOW-1545867: COPY INTO command doesn't get submitted asynchronously by using collect_nowait #1941

Open ankitsr92 opened 1 month ago

ankitsr92 commented 1 month ago

Please answer these questions before submitting your issue. Thanks!

  1. What version of Python are you using?

3.8

  1. What operating system and processor architecture are you using?

Snowpark Snowflake Stored Procedure

  1. What are the component versions in the environment (pip freeze)?

Snowpark Snowflake Stored Procedure

  1. What did you do?

when submitting copy commands to async jobs.. the copy commands still run in sequence. Whereas other sql's/CTAS does get submitted asynchronously. for table in table_list: sql_command -> COPY INTO command async_job = session.sql(sql_command).collect_nowait() async_jobs.append(async_job)

  1. What did you expect to see?

COPY INTO should be submitted asynchronously and I should see multiple parallel running COPY commands.

  1. Can you set logging to DEBUG and collect the logs?

Running within Snowflake

sfc-gh-sghosh commented 1 month ago

Hello @ankitsr92 ,

Thanks for raising the issue, we are looking into it, will update.

Regards, Sujan

sfc-gh-sghosh commented 1 month ago

Hello @ankitsr92 ,

The Snowpark stored procedure always executes in a procedural way, it doesnt execute in parallel, thats why internal copy commands execute in a procedural way.

Regards, Sujan

ankitsr92 commented 1 month ago

@sfc-gh-sghosh I am not sure if you have got the question. I am using COLLECT_NOWAIT() which means the procs should not wait for the sql to complete to move further. The problem is when I run a COPY INTO sql with collect_nowait() its still waiting for the COPY to complete to move next in the loop.

Instead if i used any other SQL ( eg CTAS, INSERT INTO or just simple SYSTEM$WAIT ) collect_nowait works fine and moves to the next sql without waiting for its completion. Try this for example,

from snowflake.snowpark.types import StringType, IntegerType
from snowflake.snowpark.async_job import AsyncJob
import time

def main(session: snowpark.Session):

    async_jobs = []

    for i in range(10):
        sql_command = "SELECT SYSTEM$WAIT(10)"
        async_job = session.sql(sql_command).collect_nowait()
        async_jobs.append(async_job)

    results = []
    for job in async_jobs:
        result = job.result()
        results.append(result)

    return "Success"
$$;
sfc-gh-sghosh commented 1 month ago

Thank you, @ankitsr92, for pointing out that. let me check and get back.

Regards, Sujan

nickhealy commented 1 month ago

@sfc-gh-sghosh do you have any updates on this? i am also facing the same issue

sfc-gh-sghosh commented 1 month ago

Hello @nickhealy @ankitsr92 ,

The team is working on the fix, will update.

Regards, Sujan

sfc-gh-aalam commented 4 weeks ago

Hi @nickhealy @ankitsr92, can you please share which snowpark version you are using and the side effect which make you think the async jobs are not being submitted asynchronously? I am not able to reproduce it on my end.

When inside stored procedure environment, you can set the version in packages

create or replace procedure my_python_sp()
returns STRING
language python
handler='my_handler_func'
runtime_version=3.8
packages=('snowflake-snowpark-python=1.20.0')
as $$
...

or you can choose it using packages dropdown

Screenshot 2024-08-15 at 1 59 20 PM