snowflakedb / snowflake-sqlalchemy

Snowflake SQLAlchemy
https://pypi.python.org/pypi/snowflake-sqlalchemy/
Apache License 2.0
233 stars 152 forks source link

SNOW-770375: SqlAlchemy in Stored procedure #400

Open adrien-turiot-maxa opened 1 year ago

adrien-turiot-maxa commented 1 year ago

Python code within Snowflake seems to only have access to a Snowpark session, such as Python Stored Procedure here or UDF.

However, with the announce of new Snowflake features like Hybrid tables, Native Apps, or Streamlit in Snowflake, then managing Transactional data within procedures or other Python code will become very useful.

Snowpark is great for query operations, but is tedious to manage DLM operations. This is mostly the case when maintaining table relationships like many-to-many associations, for which sqlalchemy has features to address that.

Hence, would there be a way to create a sqlalchemy session in Python stored procedure ?

For example by using the current snowpark session.

sfc-gh-dszmolka commented 7 months ago

hi and thank you for submitting this enhancement request - we'll consider it for future plans

ams1 commented 4 months ago

Hi @sfc-gh-dszmolka ,

So currently is there any easy way to use SqlAlchemy with a snowflake db from ex. a Streamlit app hosted on snowflake?

I don't see snowflake-sqlalchemy in the available Anaconda Packages.

Thanks.

sfc-gh-dszmolka commented 4 months ago

it is indeed not available in the Anaconda Snowflake Channel (it is not a third party package after all..) but AFAIK there will be possible in the future in SiS to use custom package imported from a stage, just like Snowpark Python UDF.

sfc-gh-dszmolka commented 4 months ago

also would like to share a quite innovative approach from one of my colleagues @sfc-gh-mrojas ; it might not work for everyone and every use-case but still putting it here in case it helps anyone until official support is available

create or replace procedure sqlalchemy_test()
    returns String
    language python
    runtime_version = 3.8
    packages =(
        'snowflake-snowpark-python==*',
        'sqlalchemy==1.4.39'
    ) imports =(
        '@mystage/wheel_loader.py',
        '@mystage/snowflake_sqlalchemy-1.5.3-py3-none-any.whl'
    )
    handler = 'main'
    as 'import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
from snowflake.connector.cursor import SnowflakeCursor
from snowflake.connector.converter import SnowflakeConverter
import sqlalchemy
import wheel_loader # get this from https://github.com/Snowflake-Labs/snowpark-extensions-py/tree/main/extras/wheel_loader
wheel_loader.load(''snowflake_sqlalchemy-1.5.3-py3-none-any.whl'') # download wheel from pypi

def _process_params_dict(self, params, cursor):
    try:
        res = {k: self._process_single_param(v) for k, v in params.items()}
        return res
    except Exception as e:
        raise Exception("Failed processing pyformat-parameters: {e}")
def _process_params_pyformat(self,params, cursor):
        if params is None:
            #if self._interpolate_empty_sequences:
            #    return None
            return {}
        if isinstance(params, dict):
            return self._process_params_dict(params,cursor)
        if not isinstance(params, (tuple, list)):
            params = [params,]
        try:
            res = map(self._process_single_param, params)
            ret = tuple(res)
            logger.debug(f"parameters: {ret}")
            return ret
        except Exception as e:
            raise Exception(f"Failed processing pyformat-parameters; {self}{params} {e}")
def _process_single_param(self, param):
        to_snowflake = self.converter.to_snowflake
        escape = self.converter.escape
        _quote = self.converter.quote
        return _quote(escape(to_snowflake(param)))

def main(session: snowpark.Session):
    import snowflake.connector.connection
    # patch this import
     # patch missing method

    snowflake.connector.connection.SnowflakeConnection = snowflake.connector.connection.StoredProcConnection

    from snowflake.sqlalchemy import URL
    from sqlalchemy import create_engine
    # Your existing Snowflake connection (replace with your actual connection)
    existing_snowflake_connection = session._conn._conn
    setattr(snowflake.connector.connection.StoredProcConnection,"_process_params_pyformat",_process_params_pyformat)
    setattr(snowflake.connector.connection.StoredProcConnection,"_process_params_dict",_process_params_dict)
    setattr(snowflake.connector.connection.StoredProcConnection,"_process_single_param",_process_single_param)
    # sql alchemy needs pyformat binding
    existing_snowflake_connection._paramstyle = "pyformat"

    # Create an engine and bind it to the existing Snowflake connection
    engine = create_engine(
        ''snowflake://'',
        creator=lambda: existing_snowflake_connection
    )
    try:
        # Use the engine to execute queries
        with engine.connect() as connection:
            results = connection.execute(''select current_version()'', []).fetchone()
            return results[0]
    finally:
        # Dispose of the engine (not necessary if you''re not using it further)
        engine.dispose()
';

call sqlalchemy_test()
sfc-gh-dszmolka commented 4 months ago

it is indeed not available in the Anaconda Snowflake Channel (it is not a third party package after all..) but AFAIK there will be possible in the future in SiS to use custom package imported from a stage, just like Snowpark Python UDF.

in the meantime i confirmed with my Product team that indeed this feature is already in private (closed) preview phase; meaning that if any of you interested are already a Snowflake customer; please reach out to your account team (e.g. your Sales rep.) and ask them to be onboarded. Of course as all features in private preview, it will eventually hit the next lifecycle phase of public preview when it will be available for everyone to try - i'll update this comment when I have any information on the timelines.