snowflakedb / snowflake-sqlalchemy

Snowflake SQLAlchemy
https://pypi.python.org/pypi/snowflake-sqlalchemy/
Apache License 2.0
231 stars 151 forks source link

SNOW-1478982: Create table with Timestamp? #506

Closed BFASRachelS closed 2 months ago

BFASRachelS commented 3 months ago

I am creating table definitions with SqlAlchemy in Python. I am able to successfully create a table with primary key which autoincrements and some other fields. However, I haven't been able to find any documentation on how to add a field with a timestamp which will be populated when a new record is added. For reference, this is how you would create this field directly in a Snowflake worksheet.

    CREATE OR REPLACE TABLE My_Table(
     TABLE_ID NUMBER NOT NULL PRIMARY KEY,
     ... Other fields
    TIME_ADDED TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );

This is my Python code...

   from sqlalchemy import Column, DateTime, Integer, Text, String
    from sqlalchemy.orm import declarative_base
    from sqlalchemy import create_engine

    Base = declarative_base()

    class My_Table(Base):
        __tablename__ = 'my_table'

        TABLE_ID = Column(Integer, primary_key=True, autoincrement=True)
        .. other columns
        .. Also need to create TIME_ADDED field

    engine = create_engine(
        'snowflake://{user}:{password}@{account_identifier}/{database}/{schema}?warehouse={warehouse}'.format(
            account_identifier=account,
            user=username,
            password=password,
            database=database,
            warehouse=warehouse,
            schema=schema
        )
    )

    Base.metadata.create_all(engine)

Can someone let me know how I could do this?

BFASRachelS commented 3 months ago

Any info on this?

sfc-gh-dszmolka commented 3 months ago

hi; i experimented with the permutations of the default clause as described in https://docs.sqlalchemy.org/en/14/core/defaults.html, something like this

..
def theTime(engine):
    connection = engine.connect()
    return connection.execute("SELECT CURRENT_TIMESTAMP();")
..
    TABLE_ID = Column(Integer, primary_key=True, autoincrement=True)
    TABLE_TIMESTAMP_ONE = Column(DateTime, default=datetime.now)
    TABLE_TIMESTAMP_TWO = Column(DateTime, default=func.now()) # func.CURRENT_TIMESTAMP() also doesn't work
    TABLE_TIMESTAMP_THREE = Column(DateTime, default=theTime(engine))

but none of them seem to populate their cells upon inserting a value in TABLE_ID and when logging out the value of the default field it remains None and their default is actually not passed to the table definition.

then tried

    TIMESTAMP_ONE = Column(DateTime, server_default=DefaultClause("CURRENT_TIMESTAMP"))

which at least allowed me to proceed to the next error, the SQL generated was

CREATE TABLE sqlalchemy506 (
    "TABLE_ID" INTEGER NOT NULL AUTOINCREMENT, 
    "TABLE_TIMESTAMP_ONE" datetime DEFAULT 'CURRENT_TIMESTAMP', 
    PRIMARY KEY ("TABLE_ID")
)

failed to execute on the Snowflake backend, due to

SQL compilation error: Default value data type does not match data type for column TABLE_TIMESTAMP_ONE

which makes sense because 'CURRENT_TIMESTAMP' is not a datetime (note the apostrophes in the expression). Did not figure out how to feed this value into DefaultClause so ended up patching snowflake/sqlalchemy/base.py instead:

 860         default = self.get_column_default_string(column)
 861         if default is not None:
 862             defaulttrimmed = default.replace("'", "").replace('"', '')
 863             colspec.append("DEFAULT " + defaulttrimmed)
 864             #colspec.append("DEFAULT " + default)

which when running the script (below), resulted in SQL

CREATE TABLE sqlalchemy506 (
    "TABLE_ID" INTEGER NOT NULL AUTOINCREMENT, 
    "TIMESTAMP_ONE" datetime DEFAULT CURRENT_TIMESTAMP, 
    "STRING_ONE" VARCHAR DEFAULT CURRENT_USER, 
    PRIMARY KEY ("TABLE_ID")
)

and when inserting a value in the table which has been just created, it populates with the expected values:

insert into test_db.public.sqlalchemy506 (table_id) values (1);
select * from test_db.public.sqlalchemy506;
TABLE_ID    TIMESTAMP_ONE   STRING_ONE
1   2024-06-27 06:46:33.994 ADMIN

now, i'm not sure if this is how it's supposed to work and if i'm doing it wrong in the first place with how default= should be specified; i'm sharing this approach because based on your comment you seem to be blocked and hopefully this can help unblock you. we'll keep looking if this is a bug how default is not working, or it never worked/implemented properly, or I'm simply doing this wrong.

finally the script i used:

# cat test.py 
from sqlalchemy import Column, DateTime, Integer, Text, String, select, func, DefaultClause
from sqlalchemy.orm import declarative_base
from sqlalchemy import create_engine
from datetime import datetime
from os import environ

Base = declarative_base()

engine = create_engine(
    'snowflake://{user}:{password}@{account_identifier}/{database}/{schema}?warehouse={warehouse}'.format(
    account_identifier=environ["SFACCOUNT"],
    user=environ["SFUSER"],
    password=environ["SFPASS"],
    database=environ["SFDB"],
    warehouse=environ["SFWH"],
    schema=environ["SFSCHEMA"]
    ))

def theTime(engine):
    connection = engine.connect()
    return connection.execute("SELECT CURRENT_TIMESTAMP();")

class My_Table(Base):
    __tablename__ = 'sqlalchemy506'

    TABLE_ID = Column(Integer, primary_key=True, autoincrement=True)
    TIMESTAMP_ONE = Column(DateTime, server_default=DefaultClause("CURRENT_TIMESTAMP"))
    STRING_ONE = Column(String, server_default=DefaultClause("CURRENT_USER"))
    #TABLE_TIMESTAMP_ONE = Column(DateTime, default=datetime.now)
    #TABLE_TIMESTAMP_TWO = Column(DateTime, default=func.now())
    #TABLE_TIMESTAMP_THREE = Column(DateTime, default=theTime(engine))

def main(engine):
    Base.metadata.create_all(engine)

if __name__ == "__main__":
    main(engine)
sfc-gh-dszmolka commented 3 months ago

okay, apparently my approach was not correct. My colleague shared the folowing solution:

from sqlalchemy import (
    Column,
    Integer,
    String,
    func,
)
from sqlalchemy.orm import Session, declarative_base
from snowflake.sqlalchemy import TIMESTAMP_NTZ

Base = declarative_base()

class TWTS(Base):
    __tablename__ = "table_with_timestamp"

    pk = Column(Integer, primary_key=True)
    name = Column(String(30))
    created = Column(TIMESTAMP_NTZ, server_default=func.now())

    def __repr__(self) -> str:
        return f"TWTS({self.pk=}, {self.name=}, {self.created=})"

Base.metadata.create_all(engine_testaccount)

session = Session(bind=engine_testaccount)
r1 = TWTS(pk=1, name="edward")
r2 = TWTS(pk=2, name="eddy")
assert r1.created is None
assert r2.created is None

session.add(r1)
session.add(r2)
session.commit()

rows = session.query(TWTS).all()
assert len(rows) == 2
for row in rows:
    print(row)

hope it helps!

sfc-gh-dszmolka commented 2 months ago

closing it for now as there's been a solution provided and also a PR for adding the example, but please do comment if you need further help