Django extension to update multiple table records with similar (but not equal) conditions in efficient way on PostgreSQL
Install via pip:
pip install django-pg-bulk-update
or via setup.py:
python setup.py install
You can make queries in 2 ways:
There are 4 query helpers in this library. There parameters are unified and described in the section below.
bulk_update(model, values, key_fields='id', using=None, set_functions=None, key_fields_ops=(), where=None, returning=None, batch_size=None, batch_delay=0)
This function updates multiple records of given model in single database query.
Functions forms raw sql query for PostgreSQL. It's work is not guaranteed on other databases.
Function returns number of updated records.
bulk_update_or_create(model, values, key_fields='id', using=None, set_functions=None, update=True, key_is_unique=True, returning=None, batch_size=None, batch_delay=0)
This function finds records by key_fields. It creates not existing records with data, given in values.
If update
flag is set, it updates existing records with data, given in values.
There are two ways, this function may work: 1) Use INSERT ... ON CONFLICT statement. It is safe, but requires PostgreSQL 9.5+ and unique index on key fields. This behavior is used by default. 2) 3-query transaction:
update
flag is set)Function returns number of records inserted or updated by query.
bulk_create(model, values, using=None, set_functions=None, returning=None, batch_size=None, batch_delay=0)
This function creates multiple records of given model in single database query.
Its functionality is the same as django's QuerySet.bulk_create,
but it is implemented on this library bases and can be more effective in some cases (for instance, for wide models).
pdnf_clause(key_fields, field_values, key_fields_ops=())
Pure django implementation of principal disjunctive normal form. It is base on combining Q() objects.
Condition will look like:
SELECT ... WHERE (a = x AND b = y AND ...) OR (a = x1 AND b = y1 AND ...) OR ...
Function returns a django.db.models.Q instance
model: Type[Model]
A subclass of django.db.models.Model to update
values: Union[Union[TUpdateValuesValid, Dict[Any, Dict[str, Any]]], Iterable[Dict[str, Any]]]
Data to update. All items must update same fields!!!
Parameter can have one of 2 forms:
bulk_create
function.
You can use this format to update key_fields
key_fields: Union[str, Iterable[str]]
Optional. Field names, which are used as update conditions.
Parameter can have one of 2 forms:
using: Optional[str]
Optional. Database alias to query. If not set, 'default' database is used.
set_functions: Optional[Dict[str, Union[str, AbstractSetFunction]]]
Optional. Functions which will be used to set values.
If given, it should be a dictionary:
Key is a field name function is applied to
Value can be:
django.db.models.expressions.BaseExpression
instance
Any django function expression returning a value.
Expression can use Value,
F,
Func and their child classes.
It can not use annotations and tables other than updated model (like F(a__b__c)
).
In create operations field default values is taken. If it is not provided, field default value is used.
Expression can contain django_pg_bulk_update.set_functions.BulkValue()
expression in it.
If so, it will be replaced with field value passed in values
parameter.
If expression does not contain BulkValue
instances data, passed in values
parameter for this key is ignored.
Function alias name
NOW()
database function. Doesn't expect any value in values
parameter.
If one is given it is ignored. django_pg_bulk_update.set_functions.AbstractSetFunction
instance
You can define your own set function. See section below.
Increment, union and concatenate functions concern NULL as default value.
key_field_ops: Union[Dict[str, Union[str, AbstractClauseOperator]], Iterable[Union[str, AbstractClauseOperator]]]
Optional. Operators, which are used to fined records for update. Operators are applied to key_fields
.
If some fields are not given, equality operator is used.
bulk_update_or_create
function always uses equality operator
Parameter can have one of 2 forms:
where
: Optional[WhereNode]
This parameter is used to filter data before doing bulk update, using QuerySet filter and exclude methods.
Generated condition should not contain annotations and other table references.
NOTE: parameter is not supported in bulk_update_or_create
returning: Optional[Union[str, Iterable[str]]]
If this parameter is set, it can be:
Query returns django_pg_returning.ReturningQuerySet instead of rows count.
Using this feature requires django-pg-returning
library installed (it is not in requirements, though).
batch_size: Optional[int]
If this parameter is set, values are split into batches of given size. Each batch is processed separately.
Note that batch_size != number of records processed if you use key_field_ops other than 'eq'
batch_delay: float
If batch_size is set, this parameter sets time to sleep in seconds between batches execution
update: bool
If flag is not set, bulk_update_or_create function will not update existing records, only creating not existing.
key_is_unique: bool
Defaults to True. Settings this flag to False forces library to use 3-query transactional update_or_create.
field_values: Iterable[Union[Iterable[Any], dict]]
Field values to use in pdnf_clause
function. They have simpler format than update functions.
It can come in 2 formats:
( (x, y), (x1, y1), ...)
({'a': x, 'b': y}, ...)
from django.db import models, F
from django.db.models.functions import Upper
from django_pg_bulk_update import bulk_update, bulk_update_or_create, pdnf_clause
from django_pg_bulk_update.set_functions import BulkValue
# Test model
class TestModel(models.Model):
name = models.CharField(max_length=50)
int_field = models.IntegerField()
# Create test data
created = TestModel.objects.pg_bulk_create([
{'id': i, 'name': "item%d" % i, 'int_field': 1} for i in range(1, 4)
])
print(created)
# Outputs 3
# Create test data returning
created = TestModel.objects.pg_bulk_create([
{'id': i, 'name': "item%d" % i, 'int_field': 1} for i in range(4, 6)
], returning='*')
print(created)
print(type(res), list(res.values_list('id', 'name', 'int_field')))
# Outputs:
# <class 'django_pg_returning.queryset.ReturningQuerySet'>
# [
# (4, "item4", 1),
# (5, "item5", 1)
# ]
# Update by id field
updated = bulk_update(TestModel, [{
"id": 1,
"name": "updated1",
}, {
"id": 2,
"name": "updated2"
}])
print(updated)
# Outputs: 2
# Update returning
res = bulk_update(TestModel, [{
"id": 1,
"name": "updated1",
}, {
"id": 2,
"name": "updated2"
}], returning=('id', 'name', 'int_field'))
print(type(res), list(res.values_list('id', 'name', 'int_field')))
# Outputs:
# <class 'django_pg_returning.queryset.ReturningQuerySet'>
# [
# (1, "updated1", 1),
# (2, "updated2", 1)
# ]
# Call update by name field
updated = bulk_update(TestModel, {
"updated1": {
"int_field": 2
},
"updated2": {
"int_field": 3
}
}, key_fields="name")
print(updated)
# Outputs: 2
print(list(TestModel.objects.all().order_by("id").values("id", "name", "int_field")))
# Outputs: [
# {"id": 1, "name": "updated1", "int_field": 2},
# {"id": 2, "name": "updated2", "int_field": 3},
# {"id": 3, "name": "item3", "int_field": 1}
# ]
# Increment int_field by 3 and transform name to upper case for records where id >= 2 and int_field < 3
updated = bulk_update(TestModel, {
(2, 3): {
"int_field": 3
}
}, key_fields=['id', 'int_field'], key_fields_ops={'int_field': '<', 'id': 'gte'},
set_functions={'int_field': '+', 'name': Upper('name')})
print(updated)
# Outputs: 1
print(list(TestModel.objects.all().order_by("id").values("id", "name", "int_field")))
# Outputs: [
# {"id": 1, "name": "updated1", "int_field": 2},
# {"id": 2, "name": "updated2", "int_field": 3},
# {"id": 3, "name": "incr", "int_field": 4}
# ]
res = bulk_update_or_create(TestModel, [{
"id": 3,
"name": "_concat1",
"int_field": 3
}, {
"id": 4,
"name": "concat2",
'int_field': 4
}], set_functions={'name': '||', 'int_field': F('int_field') + BulkValue()})
print(res)
# Outputs: 2
print(list(TestModel.objects.all().order_by("id").values("id", "name", "int_field")))
# Note: IntegerField defaults to 0 in create operations. So 0 + 4 = 4 for id 4.
# Outputs: [
# {"id": 1, "name": "updated1", "int_field": 2},
# {"id": 2, "name": "updated2", "int_field": 3},
# {"id": 3, "name": "incr_concat1", "int_field": 7},
# {"id": 4, "name": "concat2", "int_field": 4},
# ]
# Find records where
# id IN [1, 2, 3] AND name = 'updated2' OR id IN [3, 4, 5] AND name = 'concat2' OR id IN [2, 3, 4] AND name = 'updated1'
cond = pdnf_clause(['id', 'name'], [([1, 2, 3], 'updated2'),
([3, 4, 5], 'concat2'),
([2, 3, 4], 'updated1')], key_fields_ops={'id': 'in'})
data = TestModel.objects.filter(cond).order_by('int_field').values_list('int_field', flat=True)
print(list(data))
# Outputs: [3, 5]
In order to simplify using bulk_create
, bulk_update
and bulk_update_or_create
functions,
you can use a custom manager.
It automatically fills:
model
parameterusing
parameter (extracts queryset write database)where
parameter (applies queryset filters, if called as QuerySet method). Not supported in bulk_update_or_create.Note: As django 2.2
introduced bulk_update method,
library methods were renamed to pg_bulk_create
, pg_bulk_update
and pg_bulk_update_or_create
respectively.
Example:
from django.db import models
from django_pg_bulk_update.manager import BulkUpdateManager
# Test model
class TestModel(models.Model):
objects = BulkUpdateManager()
name = models.CharField(max_length=50)
int_field = models.IntegerField()
# Now you can use functions like:
TestModel.objects.pg_bulk_create([
# Any data here
], set_functions=None)
TestModel.objects.pg_bulk_update([
# Any data here
], key_fields='id', set_functions=None, key_fields_ops=())
# Update only records with id greater than 5
TestModel.objects.filter(id__gte=5).pg_bulk_update([
# Any data here
], key_fields='id', set_functions=None, key_fields_ops=())
TestModel.objects.pg_bulk_update_or_create([
# Any data here
], key_fields='id', set_functions=None, update=True)
If you already have a custom manager, you can replace QuerySet to BulkUpdateQuerySet:
from django.db import models
from django.db.models.manager import BaseManager
from django_pg_bulk_update.manager import BulkUpdateQuerySet
class CustomManager(BaseManager.from_queryset(BulkUpdateQuerySet)):
pass
# Test model
class TestModel(models.Model):
objects = CustomManager()
name = models.CharField(max_length=50)
int_field = models.IntegerField()
If you already have a custom QuerySet, you can inherit it from BulkUpdateMixin:
from django.db import models
from django.db.models.manager import BaseManager
from django_pg_bulk_update.manager import BulkUpdateMixin
class CustomQuerySet(BulkUpdateMixin, models.QuerySet):
pass
class CustomManager(BaseManager.from_queryset(CustomQuerySet)):
pass
# Test model
class TestModel(models.Model):
objects = CustomManager()
name = models.CharField(max_length=50)
int_field = models.IntegerField()
You can define your own clause operator, creating AbstractClauseOperator
subclass and implementing:
names
attributedef get_django_filter(self, name)
methoddef get_sql_operator(self)
or def get_sql(self, table_field, value)
When clause is formed, it calls get_sql()
method.
In order to simplify method usage of simple field <op> value
operators,
by default get_sql()
forms this condition, calling get_sql_operator()
method, which returns Optionally, you can change def format_field_value(self, field, val, connection, cast_type=True, **kwargs)
method,
which formats value according to field rules
Example:
from django_pg_bulk_update import bulk_update
from django_pg_bulk_update.clause_operators import AbstractClauseOperator
class LTClauseOperator(AbstractClauseOperator):
names = {'lt', '<'}
def get_django_filter(self, name): # type: (str) -> str
"""
This method should return parameter name to use in django QuerySet.fillter() kwargs
:param name: Name of parameter
:return: String with filter
"""
return '%s__lt' % name
def get_sql_operator(self): # type: () -> str
"""
If get_sql operator is simple binary operator like "field <op> val", this functions returns operator
:return: str
"""
return '<'
# Usage examples
# import you function here before calling an update
bulk_update(TestModel, [], key_field_ops={'int_field': 'lt'})
bulk_update(TestModel, [], key_field_ops={'int_field': LTClauseOperator()})
You can use class instance directly in key_field_ops
parameter or use its aliases from names
attribute.
When update function is called, it searches for all imported AbstractClauseOperator subclasses and takes first class
which contains alias in names
attribute.
You can define your own set function, creating AbstractSetFunction
subclass and implementing:
names
attributesupported_field_classes
attributedef get_sql_value(self, field, val, connection, val_as_param=True, with_table=False, for_update=True, **kwargs)
method
This method defines new value to set for parameter. It is called from get_sql(...)
method by default.def get_sql(self, field, val, connection, val_as_param=True, with_table=False, for_update=True, **kwargs)
method
This method sets full sql and it params to use in set section of update query."%s" = self.get_sql_value(...)
, paramsOptionally, you can change:
def format_field_value(self, field, val, connection, cast_type=False, **kwargs)
method, if input data needs special formatting. def modify_create_params(self, model, key, kwargs)
method, to change data before passing them to model constructor
in bulk_update_or_create()
. This method is used in 3-query transactional update only. INSERT ... ON CONFLICT
uses for_update flag of get_sql()
and get_sql_value()
functionsExample:
from django_pg_bulk_update import bulk_update
from django_pg_bulk_update.set_functions import AbstractSetFunction
class CustomSetFunction(AbstractSetFunction):
# Set function alias names
names = {'func_alias_name'}
# Names of django field classes, this function supports. You can set None (default) to support any field.
supported_field_classes = {'IntegerField', 'FloatField', 'AutoField', 'BigAutoField'}
def get_sql_value(self, field, val, connection, val_as_param=True, with_table=False, for_update=True, **kwargs):
"""
Returns value sql to set into field and parameters for query execution
This method is called from get_sql() by default.
:param field: Django field to take format from
:param val: Value to format
:param connection: Connection used to update data
:param val_as_param: If flag is not set, value should be converted to string and inserted into query directly.
Otherwise a placeholder and query parameter will be used
:param with_table: If flag is set, column name in sql is prefixed by table name
:param for_update: If flag is set, returns update sql. Otherwise - insert SQL
:param kwargs: Additional arguments, if needed
:return: A tuple: sql, replacing value in update and a tuple of parameters to pass to cursor
"""
# If operation is incremental, it should be ready to get NULL in database
null_default, null_default_params = self._parse_null_default(field, connection, **kwargs)
# Your function/operator should be defined here
tpl = 'COALESCE("%s", %s) + %s'
if val_as_param:
sql, params = self.format_field_value(field, val, connection)
return tpl % (field.column, null_default, sql), null_default_params + params
else:
return tpl % (field.column, null_default, str(val)), null_default_params
# Usage examples
# import you function here before calling an update
bulk_update(TestModel, [], set_functions={'int_field': 'func_alias_name'})
bulk_update(TestModel, [], set_functions={'int_field': CustomSetFunction()})
You can use class instance directly in set_functions
parameter or use its aliases from names
attribute.
When update function is called, it searches for all imported AbstractSetFunction subclasses and takes first class
which contains alias in names
attribute.
Library supports django.contrib.postgres.fields:
Note that ArrayField and HStoreField are available since django 1.8, JSONField - since django 1.9.
RangeField supports are available since PostgreSQL 9.2, psycopg2 since 2.5 and django since 1.8.
PostgreSQL before 9.4 doesn't support jsonb, and so - JSONField.
PostgreSQL 9.4 supports JSONB, but doesn't support concatenation operator (||). In order to support this set function a special function for postgres 9.4 was written. Add a migration to create it:
from django.db import migrations,
from django_pg_bulk_update.compatibility import Postgres94MergeJSONBMigration
class Migration(migrations.Migration):
dependencies = []
operations = [
Postgres94MergeJSONBMigration()
]
PostgreSQL before 9.5 doesn't support INSERT ... ON CONFLICT statement. So 3-query transactional update will be used.
Test background:
This is an Open source project developed by M1ha-Shvn
under BSD 3 license.
Feel free to create issues and make pull requests.
Library test system is based on django.test.
You can find them in tests
directory.
requirements-test.txt
file docker build . --tag django-pg-bulk-pupdate
in project directorydocker-compose run run_tests
in project directory CREATE ROLE test;
ALTER ROLE test WITH SUPERUSER;
ALTER ROLE test WITH LOGIN;
ALTER ROLE test PASSWORD 'test';
CREATE DATABASE test OWNER test;
CREATE DATABASE test2 OWNER test;
pip3 install -U -r requirements-test.txt
python3 runtests.py
Pros:
Cons:
Pros:
Cons: