Open scuervo91 opened 2 years ago
I also wondered how to store JSON objects without converting to string. SQL Alchemy supports storing these directly
@OXERY && @scuervo91 - I was able to get something that works Using this:
regions: dict = Field(sa_column=Column(JSON), default={'all': 'true'})
That said: this is a postgresql JSONB column in my database. But it works.
For a nested Object you could use a pydantic model as the Type and do it the same way. Hope this helps as I was having a difficult time figuring out a solution as well :)
I also got it working, on SQLite and Postgresql:
mygreatfield: Dict[Any, Any] = Field(index=False, sa_column=Column(JSON))
needs from sqlmodel import Field, SQLModel, Column, JSON
as well as from typing import Dict, Any
@TheJedinator Could you help a bit more with the nested object? I tried to "use the pydantic model as the Type" but I can't get it to work :( Here is my snippet:
from sqlalchemy import Column
from sqlalchemy.dialects.postgresql import JSONB
from sqlmodel import Field
from sqlmodel import Session
from sqlmodel import SQLModel
from engine import get_sqlalchemy_engine
class J(SQLModel):
j: int
class A(SQLModel, table=True):
a: int = Field(primary_key=True)
b: J = Field(sa_column=Column(JSONB))
engine = get_sqlalchemy_engine()
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
a = A(a=1, b=J(j=1))
session.add(a)
session.commit()
session.refresh(a)
Throws an error
sqlalchemy.exc.StatementError: (builtins.TypeError) Object of type J is not JSON serializable
[SQL: INSERT INTO a (b, a) VALUES (%(b)s, %(a)s)]
[parameters: [{'a': 1, 'b': J(j=1)}]]
@psarka
j = J(j=1)
db_j = J.from_orm(j)
a = A(a=1, b=db_j)
This should resolve your issue in preparing the object for the database. What I'm seeing in the error is that the Raw Object is being included in the statement rather than the instance...
If this doesn't help I can definitely put some more time in to looking at what's going on.
Thank you! Unfortunately I get the same error :(
I found one workaround - registering a custom_serializer for the sqlalchemy engine, like so:
def custom_serializer(d):
return json.dumps(d, default=lambda v: v.json())
def get_sqlalchemy_engine():
return create_engine("postgresql+psycopg2://", creator=get_conn, json_serializer=custom_serializer)
But if there is a cleaner way, I would gladly use that instead.
Hey @psarka
I just actually tried what I told and sorry have mislead... I did get a working solution though 😄
It was actually the opposite function that you need to use, here's the example you supplied with the amendments to make it work:
with Session(engine) as session:
j = J(j=1)
j_dumped = J.json(j)
a = A(a=1, b=j_dumped)
session.add(a)
session.commit()
session.refresh(a)
Hmm, this doesn't (or at least shouldn't) typecheck :)
But I see what you did there, essentially it's the same as registring a custom serializer, but manually.
It does type check when you create the J
Object (which it should) So if you tried to supply a string it would fail J(j="foo")
This allows for the type checking of the object, the A
class requires a serialized version of J in order for it to be entered in to the database.
It is essentially the same as registering a custom serializer but allows you to be explicit about using it.
A hacky method with type checking that work with sqlite is
from sqlalchemy import Column
from typing import List
# from sqlalchemy.dialects.postgresql import JSONB
from sqlmodel import Field
from sqlmodel import Session
from pydantic import validator
from sqlmodel import SQLModel, JSON,create_engine
# from engine import get_sqlalchemy_engine
sqlite_file_name = "test.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
engine = create_engine(sqlite_url)
class J2(SQLModel):
test: List[int]
class J(SQLModel):
j: int
nested: J2
class A(SQLModel, table=True):
a: int = Field(primary_key=True)
b: J = Field(sa_column=Column(JSON))
@validator('b')
def val_b(cls, val):
return val.dict()
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
a = A(a=1, b=J(j=1,nested=J2(test=[100,100,100])))
session.add(a)
session.commit()
session.refresh(a)
hi, I created a "JSON Field" based on what is written here. I am using SQLite.
from sqlmodel import SQLModel,Relationship,Field,JSON
from typing import Optional,List, Dict
from sqlalchemy import Column
from pydantic import validator
#
class J2(SQLModel):
id: int
title:str
#
class Companies(SQLModel, table=True):
id:Optional[int]=Field(default=None,primary_key=True)
name:str
adddresses: List['J2'] = Field(sa_column=Column(JSON))
@validator('adddresses')
def val_b(cls, val):
print(val)
return val.dict()
Given error.
TypeError: Type is not JSON serializable: J2
when i print it, it returns
[J2(id=1, title='address1'), J2(id=2, title='address2')]
how can i handle that? Why is this J2 added, how can I get rid of it, i can't turn it to .dict(), i cannot serialise it... can you give an idea?
Does this work?
@validator('adddresses')
def val_b(cls, value):
print(value)
return [v.dict() for v in value]
Does this work?
@validator('adddresses') def val_b(cls, value): print(value) return [v.dict() for v in value]
@HenningScheufler thank you for your help, it worked perfect.
Hey all,
thanks for the great advice here. Creating a the object using the classes and writing them to the DB works as expected and writes the data as a dict into a JSON field.
See this example:
class ComplexHeroField(SQLModel, table=False):
some: str
other: float
more: Optional[List[str]]
class Hero(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
complex_field: ComplexHeroField = Field(sa_column=Column(JSON))
name: str
secret_name: str
age: Optional[int] = None
@validator('complex_field')
def val_complex(cls, val: ComplexHeroField):
# Used in order to store pydantic models as dicts
return val.dict()
class Config:
arbitrary_types_allowed = True
However, when reading the model from the DB using a select()
I would want the JSON field to be read into a ComplexHeroField class using pydantics parse_raw
or parse_obj
. Because they way it's currently done (with the validator) this happens:
statement = select(Hero)
results = session.exec(statement)
for hero in results:
print(hero.complex_field.some)
# AttributeError: 'dict' object has no attribute 'some'
Any hint how that could be achieved? Maybe via the custom-serialiser mentioned by @psarka ?
Thanks already!
Something like this works, but obviously doesn't scale if we have mulitple nested models, instead of just the ComplexHeroField
:
def custom_serializer(d):
return json.dumps(d, default=lambda v: v.json())
def custom_deserialiser(d):
return ComplexHeroField.parse_raw(d)
engine = create_engine(url_string, echo=True, json_serializer=custom_serializer, json_deserializer=custom_deserialiser)
complex_value = ComplexHeroField(some="value", other=5, more=["dd", "sdf"])
hero_1 = Hero(name="Deadpond", secret_name="Dive Wilson", complex_field=complex_value)
session.add(hero_1)
session.commit()
statement = select(Hero)
results = session.exec(statement)
for hero in results:
print(hero.complex_field.some)
# value
Instead, we would need more context in the deserialiser (i.e. access to the type-hint of the field we're trying to deserialise so that we can use UseType.parse_raw()
.
Any hint where and how I could achieve that kind of access to the deserialisation process?
Thanks :)
Hey all,
after looking at this again, I've been able to resolve it as follows.
For our sqlalchemy models we created this PydanticJSONType factory:
def pydantic_column_type(pydantic_type):
class PydanticJSONType(TypeDecorator, Generic[T]):
impl = JSON()
def __init__(
self, json_encoder=json,
):
self.json_encoder = json_encoder
super(PydanticJSONType, self).__init__()
def bind_processor(self, dialect):
impl_processor = self.impl.bind_processor(dialect)
dumps = self.json_encoder.dumps
if impl_processor:
def process(value: T):
if value is not None:
if isinstance(pydantic_type, ModelMetaclass):
# This allows to assign non-InDB models and if they're
# compatible, they're directly parsed into the InDB
# representation, thus hiding the implementation in the
# background. However, the InDB model will still be returned
value_to_dump = pydantic_type.from_orm(value)
else:
value_to_dump = value
value = recursive_custom_encoder(value_to_dump)
return impl_processor(value)
else:
def process(value):
if isinstance(pydantic_type, ModelMetaclass):
# This allows to assign non-InDB models and if they're
# compatible, they're directly parsed into the InDB
# representation, thus hiding the implementation in the
# background. However, the InDB model will still be returned
value_to_dump = pydantic_type.from_orm(value)
else:
value_to_dump = value
value = dumps(recursive_custom_encoder(value_to_dump))
return value
return process
def result_processor(self, dialect, coltype) -> T:
impl_processor = self.impl.result_processor(dialect, coltype)
if impl_processor:
def process(value):
value = impl_processor(value)
if value is None:
return None
data = value
# Explicitly use the generic directly, not type(T)
full_obj = parse_obj_as(pydantic_type, data)
return full_obj
else:
def process(value):
if value is None:
return None
# Explicitly use the generic directly, not type(T)
full_obj = parse_obj_as(pydantic_type, value)
return full_obj
return process
def compare_values(self, x, y):
return x == y
return PydanticJSONType
where recursive_custom_encoder()
is pretty much the fastAPI jsonable_encoder
Using this in SQLModel as follows:
class ConnectionResistances(SQLConnectionModel, table=False):
very_short: ResistancesInLoadDuration = ResistancesInLoadDuration()
short: ResistancesInLoadDuration = ResistancesInLoadDuration()
middle: ResistancesInLoadDuration = ResistancesInLoadDuration()
long: ResistancesInLoadDuration = ResistancesInLoadDuration()
constant: ResistancesInLoadDuration = ResistancesInLoadDuration()
earth_quake: ResistancesInLoadDuration = ResistancesInLoadDuration()
class Connection(SQLConnectionModel, table=True):
id: Optional[uuid.UUID] = Field(default=None, sa_column=Column(PGUUID(as_uuid=True), default=uuid.uuid4, primary_key=True))
name: str
comment: str
path_to_pdf: Optional[str] = None
resistance_values: ConnectionResistances = Field(..., sa_column=Column(pydantic_column_type(ConnectionResistances)))
Works perfectly! That means:
This could be integrated into an sqlmodel api based on the type hint alone (i.e. creating the sa_column based on the pydantic type automatically). Potentially in get_sqlachemy_type
.
What do you think, @tiangolo?
@tiangolo Any updates ?
Hey @MaximilianFranz Would you mind sharing your entire solution, I am quite interested in trying it out, but it is missing some code pieces.
Hey @MaximilianFranz Would you mind sharing your entire solution, I am quite interested in trying it out, but it is missing some code pieces.
What exactly are you missing? Happy to provide more context!
The recursive_custom_encoder
is missing. Ideally, a fully working example I can simply copy/paste and adapt to my use case ;)
You can use jsonable_encode
like such, instead of the recursive_custom_encoder
from fastapi.encoders import jsonable_encoder
also I would start with a simpler model like:
class NestedModel(SQLModel):
some_value: str
class OuterModel(SQLModel, table=True):
guid: str = Field(
default=None,
sa_column=Column(PGUUID(as_uuid=True), default=uuid.uuid4, primary_key=True),
)
nested: NestedModel = Field(..., sa_column=Column(pydantic_column_type(NestedModel)))
That should work!
Thanks, @MaximilianFranz Let me try. My code is here: https://github.com/Lightning-AI/lightning-hpo/blob/master/lightning_hpo/commands/sweep.py#L36. Trying to store the Sweep distributions.
Do you think it would work with the recursion?
Missing parse_obj_as
and ModelMetaclass
.
Hey @MaximilianFranz
I have made a draft PR there: https://github.com/Lightning-AI/lightning-hpo/pull/19/files. I tried but it is raising an error. Would you mind having a look?
Best, T.C
Thanks, @MaximilianFranz Let me try. My code is here: https://github.com/Lightning-AI/lightning-hpo/blob/master/lightning_hpo/commands/sweep.py#L36. Trying to store the Sweep distributions. Do you think it would work with the recursion? Missing
parse_obj_as
andModelMetaclass
.
Both parse_obj_as
and ModelMetaClass
can be imported from pydantic:
from pydantic import parse_obj_as
from pydantic.main import ModelMetaclass
As for the error, would you mind pointing me to the action that fails or post a traceback somewhere?
It makes sense that it doesn't work yet. You'll have to use the ModelMetaclass
as is done in my snippet above for the isinstance
check. Also the import for parse_obj_as
is missing, so it can't work as it is :)
Hey @MaximilianFranz, I updated the code with your inputs, but it is still failing. I pushed the updated code.
File "/Users/thomas/Documents/GitHub/LAI-lightning-hpo-App/lightning_hpo/components/servers/db/server.py", line 42, in insert_sweep
session.commit()
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1451, in commit
self._transaction.commit(_to_root=self.future)
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 829, in commit
self._prepare_impl()
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 808, in _prepare_impl
self.session.flush()
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3383, in flush
self._flush(objects)
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3523, in _flush
transaction.rollback(_capture_exception=True)
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
raise exception
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3483, in _flush
flush_context.execute()
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
rec.execute(self)
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
util.preloaded.orm_persistence.save_obj(
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
_emit_insert_statements(
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 1238, in _emit_insert_statements
result = connection._execute_20(
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1631, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 332, in _execute_on_connection
return connection._execute_clauseelement(
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1498, in _execute_clauseelement
ret = self._execute_context(
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1862, in _execute_context
self._handle_dbapi_exception(
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2043, in _handle_dbapi_exception
util.raise_(
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
raise exception
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1819, in _execute_context
self.dialect.do_execute(
File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.InterfaceError: (sqlite3.InterfaceError) Error binding parameter 5 - probably unsupported type.
[SQL: INSERT INTO sweepconfig (distributions, sweep_id, script_path, n_trials, simultaneous_trials, requirements, script_args, framework, cloud_compute, num_nodes, logger, direction) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)]
[parameters: ('{"name": "model.lr", "distribution": "uniform", "params": {"params": {"low": "0.001", "high": "0.1"}}}', 'thomas-5e0dd935', 'train.py', 1, 1, [], [], 'pytorch_lightning', 'cpu', 1, 'wandb', 'maximize')]
(Background on this error at: https://sqlalche.me/e/14/rvf5)
Hey @MaximilianFranz, I updated the code with your inputs, but it is still failing. I pushed the updated code.
File "/Users/thomas/Documents/GitHub/LAI-lightning-hpo-App/lightning_hpo/components/servers/db/server.py", line 42, in insert_sweep session.commit() File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1451, in commit self._transaction.commit(_to_root=self.future) File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 829, in commit self._prepare_impl() File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 808, in _prepare_impl self.session.flush() File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3383, in flush self._flush(objects) File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3523, in _flush transaction.rollback(_capture_exception=True) File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__ compat.raise_( File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 208, in raise_ raise exception File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 3483, in _flush flush_context.execute() File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute rec.execute(self) File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute util.preloaded.orm_persistence.save_obj( File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj _emit_insert_statements( File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/orm/persistence.py", line 1238, in _emit_insert_statements result = connection._execute_20( File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1631, in _execute_20 return meth(self, args_10style, kwargs_10style, execution_options) File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 332, in _execute_on_connection return connection._execute_clauseelement( File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1498, in _execute_clauseelement ret = self._execute_context( File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1862, in _execute_context self._handle_dbapi_exception( File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2043, in _handle_dbapi_exception util.raise_( File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 208, in raise_ raise exception File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1819, in _execute_context self.dialect.do_execute( File "/Users/thomas/Documents/GitHub/lightning/.venv/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute cursor.execute(statement, parameters) sqlalchemy.exc.InterfaceError: (sqlite3.InterfaceError) Error binding parameter 5 - probably unsupported type. [SQL: INSERT INTO sweepconfig (distributions, sweep_id, script_path, n_trials, simultaneous_trials, requirements, script_args, framework, cloud_compute, num_nodes, logger, direction) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)] [parameters: ('{"name": "model.lr", "distribution": "uniform", "params": {"params": {"low": "0.001", "high": "0.1"}}}', 'thomas-5e0dd935', 'train.py', 1, 1, [], [], 'pytorch_lightning', 'cpu', 1, 'wandb', 'maximize')] (Background on this error at: https://sqlalche.me/e/14/rvf5)
To finish this, the problem ended up being a attribute of type List[str]
on an SQLModel, which is not natively supported. Using the above pydantic_column_type
with List[str]
works however and will encode the list as json-string in order to store it to the database.
Hoping this gets merged in the near future.
I was able to setup JSONB
column for postgresql database instead of JSON
that comes with sqlmodel
by:
from sqlalchemy.dialects.postgresql import JSONB
from sqlmodel import Field, SQLModel, Column
class test(SQLModel, table=True):
result: dict = Field(sa_column=Column(JSONB), default={"message":"hello world"})
Would be great to have this in-built! CC: @tiangolo , @MaximilianFranz
@tiangolo , @MaximilianFranz
I arrived here looking at something that I imagined might not work yet but that seems to be a feature one would expect from a library like this. The use case is simple (the solution I am not sure haha).
If you have a model with a field that is typed as a pedantic model, it seems to me that as long as you specify the column being a JSON type serialization and deserialiation should happen automatically from the database.
class MyOtherModel(BaseModel):
name: str
id: int
class MyModel(SQLModel, table=True):
id = int](id: int | None = Field(default=None, primary_key=True))
my_field: MyOtherModel = Field(sa_column=Collumn(type_=JSON)
# ...setup session...
model = session.exec(select(MyModel).where(MyModel.id == 1)).first()
print(model.my_field.name) # Should work
model.my_field = MyOtherModel(name="New name", id=123)
session.flush() # Should work
Is this something that is being considered?
In case this helps anyone, here is how I managed to achieve it, although depending on how many of these fields you have it might impact performance a bit.
I have a model named Meetup
that has a field MeetupLocation
that I wanted to serialize in the database as JSON.
class Meetup(SQLModel, table=True):
__tablename__: str = "meetups" # type: ignore
# ... some fields definitions ...
location: MeetupLocation | None = Field(
default=None,
sa_column=Column(type_=MeetupLocation.as_mutable(JSON(none_as_null=True)), nullable=True),
)
I will talk about how MeetupLocation
has been defined in a bit.
To serialize and deserialize, I leveraged the engine using the json_serializer
and json_deserializer
arguments. It looks something like this
def serialize_pydantic_model(model: BaseModel) -> str:
return model.model_dump_json()
def deserialize_pydantic_model(data: str) -> BaseModel | None:
# Try deserializing with each model until one works.
# This is a pretty ugly solution but the deserialization seems to only be possible and reliable at an engine level
# and we need to know the model to deserialize it properly
# We would need to keep adding more of these if we add more models with JSON fields.
with suppress(ValidationError):
return MeetupLocation.model_validate_json(data)
return None
engine = create_engine(
db_config.full_url,
echo=db_config.engine_echo,
json_serializer=serialize_pydantic_model,
json_deserializer=deserialize_pydantic_model,
)
The key part here is that SQLModel seems to be breaking mutable tracking by SQLAlchemy, so when you later do something like
meetup.location.name = "Other name"
session.add(meetup)
session.commit()
It will not be persisted in the db because, as far as the Session
cares, no modification has been made in the model.
To fix this, I have created a MutableModel
that takes care of informing the Session
through Mutable
that the object has been updated.
from pydantic import BaseModel
from sqlalchemy.ext.mutable import Mutable
class MutableModel(BaseModel, Mutable):
def __setattr__(self, name: str, value: Any) -> None:
"""Allows SQLAlchmey Session to track mutable behavior when updating any field"""
self.changed()
return super().__setattr__(name, value)
@classmethod
def coerce(cls, key: str, value: Any) -> Self | None:
"""Convert JSON to MeetupLocation object allowing for mutable behavior"""
if isinstance(value, cls) or value is None:
return value
if isinstance(value, str):
return cls.model_validate_json(value)
if isinstance(value, dict):
return cls(**value)
super().coerce(key, value)
Then, MeetupLocation
is created like this
class MeetupLocation(MutableModel):
name: str | None = None
location: tuple[float, float] | None = None
And this works as far as I have been able to validate it. It can read json and transform them into the we want and it can update the object in the database if anything changes in the model later on.
I hope this helps.
def serialize_pydantic_model(model: BaseModel) -> str: return model.model_dump_json()
a little question, why not use model.model_dump()
?
and maybe a fix at Pydantic level for the deserializer would be better IMHO.
Otherwise, when I insert data into DB, the nested field is already a dict type.
In fact, during the data preparation, I have an extra step with model_validate()
like:
new_row = {"name": "aa", "location": {"address": "bb", "city": "cc"}}
new_row_db = Meetup.model_validate(new_row )
# Here, new_row_db.location is a dict type.
session.add(new_row_db)
The serializer is a must have otherwise I cannot add the new row into DB, but the deserializer is optional for me, as I'm fine with dict
instead of MeetupLocation
.
In my very personl senario, my json data are in format like
{
"object": "array",
"data": [
1,
2
]
}
Every json object must have two keys object
and data
, therefore I can determinate the exact MutableModel
to deserilize the json string. Maybe we can store this information somewhere for nested Pydantic Models in SQLModel the help with deserilization?
In case this helps anyone, here is how I managed to achieve it, although depending on how many of these fields you have it might impact performance a bit.
I have a model named
Meetup
that has a fieldMeetupLocation
that I wanted to serialize in the database as JSON.class Meetup(SQLModel, table=True): __tablename__: str = "meetups" # type: ignore # ... some fields definitions ... location: MeetupLocation | None = Field( default=None, sa_column=Column(type_=MeetupLocation.as_mutable(JSON(none_as_null=True)), nullable=True), )
I will talk about how
MeetupLocation
has been defined in a bit.To serialize and deserialize, I leveraged the engine using the
json_serializer
andjson_deserializer
arguments. It looks something like thisdef serialize_pydantic_model(model: BaseModel) -> str: return model.model_dump_json() def deserialize_pydantic_model(data: str) -> BaseModel | None: # Try deserializing with each model until one works. # This is a pretty ugly solution but the deserialization seems to only be possible and reliable at an engine level # and we need to know the model to deserialize it properly # We would need to keep adding more of these if we add more models with JSON fields. with suppress(ValidationError): return MeetupLocation.model_validate_json(data) return None engine = create_engine( db_config.full_url, echo=db_config.engine_echo, json_serializer=serialize_pydantic_model, json_deserializer=deserialize_pydantic_model, )
The key part here is that SQLModel seems to be breaking mutable tracking by SQLAlchemy, so when you later do something like
meetup.location.name = "Other name" session.add(meetup) session.commit()
It will not be persisted in the db because, as far as the
Session
cares, no modification has been made in the model.To fix this, I have created a
MutableModel
that takes care of informing theSession
throughMutable
that the object has been updated.from pydantic import BaseModel from sqlalchemy.ext.mutable import Mutable class MutableModel(BaseModel, Mutable): def __setattr__(self, name: str, value: Any) -> None: """Allows SQLAlchmey Session to track mutable behavior when updating any field""" self.changed() return super().__setattr__(name, value) @classmethod def coerce(cls, key: str, value: Any) -> Self | None: """Convert JSON to MeetupLocation object allowing for mutable behavior""" if isinstance(value, cls) or value is None: return value if isinstance(value, str): return cls.model_validate_json(value) if isinstance(value, dict): return cls(**value) super().coerce(key, value)
Then,
MeetupLocation
is created like thisclass MeetupLocation(MutableModel): name: str | None = None location: tuple[float, float] | None = None
And this works as far as I have been able to validate it. It can read json and transform them into the we want and it can update the object in the database if anything changes in the model later on.
I hope this helps.
First Check
Commit to Help
Example Code
Description
I have already implemented an API using FastAPI to store Pydantic Models. These models are themselves nested Pydantic models so the way they interact with a Postgres DataBase is throught JsonField. I've been using Tortoise ORM as the example shows.
Is there an equivalent model in SQLModel?
Operating System
Linux
Operating System Details
WSL 2 Ubuntu 20.04
SQLModel Version
0.0.4
Python Version
3.8
Additional Context
No response