Open pudo opened 4 years ago
Hi, pudo. I'm trying to implement this function. I create a decorator to decorate method which want to use low level api, like this:
# others import
from functools import wraps
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.dialects.mysql import insert as mysql_insert
def low_level_api(func_):
def upsert(*args, **kwargs):
self, row, keys = args[0], args[1], args[2]
if not self.has_index(keys, is_unique=True):
return func_(*args, **kwargs)
if self.db.is_postgres:
do_update_stmt = pg_insert(self.table).values(**row).on_conflict_do_update(
index_elements=keys, set_=row
)
return self.db.executable.execute(do_update_stmt).rowcount == 1
elif self.db.is_mysql:
keys_map = {k: row[k] for k in keys}
do_update_stmt = mysql_insert(self.table).values(**row).on_duplicate_key_update(**keys_map)
return self.db.executable.execute(do_update_stmt).rowcount == 1
else:
return func_(*args, **kwargs)
@wraps(func_)
def inner(*args, **kwargs):
self = args[0]
if not self._use_low_level_api:
return func_(*args, **kwargs)
if func_.__name__ == "upsert":
return upsert(*args, **kwargs)
return func_(*args, **kwargs)
return inner
class Table(object):
"""Represents a table in a database and exposes common operations."""
PRIMARY_DEFAULT = "id"
def __init__(
self,
database,
table_name,
primary_id=None,
primary_type=None,
auto_create=False,
use_low_level_api=True
):
# others
self._use_low_level_api = use_low_level_api
self._unique_indexes = []
def has_index(self, columns, is_unique=False):
"""Check if an index exists to cover the given ``columns``."""
if not self.exists:
return False
columns = set([self._get_column_name(c) for c in columns])
if columns in self._indexes:
if is_unique:
return columns in self._unique_indexes
return True
for column in columns:
if not self.has_column(column):
return False
pk_columns = self.db.inspect.get_pk_constraint(self.name, schema=self.db.schema)["constrained_columns"]
if is_unique and columns == pk_columns:
self._indexes.append(columns)
self._unique_indexes.append(columns)
return True
indexes = self.db.inspect.get_indexes(self.name, schema=self.db.schema)
for index in indexes:
if columns == set(index.get("column_names", [])):
self._indexes.append(columns)
if index["unique"]:
self._unique_indexes.append(columns)
return True if is_unique else False
else:
return False if is_unique else True
return False
@low_level_api
def upsert(self, row, keys, ensure=None, types=None):
pass
There are some diffrent points with original upsert method:
Is that a good idea? What do you think about? @pudo
Hey @Remalloc - did you see https://github.com/pudo/dataset/pull/335 ? The unique constraint is what's stalled this. I do not think that we should be creating unique constraints dynamically when a user calls .upsert()
, that's breaking its semantics. So I now think that this in fact a new method, which would create the unique index if needed and then run the low-level API. The method would just not work on sqlite, or call upsert internally.
Oh, sorry. I didn't notice that PR. I agree with you, upsert method shouldn't create unique constraints dynamically. Can we only check unique constraints and automatic decide to use the low-level API if that existed. Perhaps, we can add a new upsert parameter to control whether to use the low-level API and create unique constraints. That's a good way? @pudo
Some backends, most notably PostgreSQL, support an upsert operation inside the database backend (i.e.
INSERT ... ON CONFLICT UPDATE ...
). We should explore if we can automatically switch to using this for datasets application-level upsert command when available.(@mynameisfiber maybe this could be part of accelerating your project?)