Open ktosiek opened 7 years ago
Hi @ktosiek, I think that could be a great addition. Would you see this working to reduce the time it takes to regenerate views when sync_pgviews
is run on a live system?
That's the idea. Currently the view is locked from the moment of issuing delete until it's recreated. With this scheme it could only get locked for a short time.
Edit: I've just realized, are those drops/creates in a transaction? If not there's a time when the view does not exist at all
I've just confirmed (by getting failures on prod, should've tested earlier >_>) that those drops/creates are in fact in separate transactions.
I think the best approach would be sync_pgviews call REFRESH MATERIALIZED VIEW mymatview;, instead of dropping and creating it again:
https://www.postgresql.org/docs/9.4/static/rules-materializedviews.html
Btw, with PR #45 drops/creates will now be run within the same transaction
Just for the record: my motivation was getting the performance of REFRESH with concurrency of REFRESH CONCURRENTLY.
Drop/create in one transaction will block all transactions using the view, this is similar to just calling REFRESH. To keep the view accessible one can use REFRESH CONCURRENTLY, which has different performance - it's more like deleting all rows and inserting the new ones. This is much slower than a full refresh for bigger views, but doesn't keep an exclusive lock on the view.
Edit: I probably got the performance characteristics of CONCURRENTLY wrong, but it was much slower where I've tried to use it
I've got a materialized view in production that takes 2m15s to REFRESH, and 9m30s to REFRESH CONCURRENTLY. When it hurts, it hurts bad :P Doing a create & swap would be a godsend – especially if the materialized view has indices, because they can be created after the view has been populated with data, in one fell swoop.
I'd like to open a PR for this at some point. I've begun introducing this refresh-by-swap in-house, and I'd like to share the code. While working on it, I noticed that indexes declared on the MatView are not created. This subclass also introduces creation of indexes through a tailored utilization of the migration autodetector.
import random
import string
from typing import List, Dict, Tuple
from django.apps import apps
from django.db import connection, transaction
from django.db.backends.base.base import BaseDatabaseWrapper
from django.db.backends.base.schema import BaseDatabaseSchemaEditor
from django.db.migrations import Migration
from django.db.migrations.autodetector import MigrationAutodetector
from django.db.migrations.state import ModelState, ProjectState
from django_pgviews.view import MaterializedView, ReadOnlyMaterializedView
def get_random_chars(n: int) -> str:
chars = random.sample(string.digits + string.ascii_lowercase, n)
return ''.join(chars)
class NonClashingMatViewSchemaEditor(BaseDatabaseSchemaEditor):
"""Uses temp names for indexes, generating rename stmts to be applied later
This special SQL-collection-only SchemaEditor subclass alters the names
used for CREATE INDEX statements, so they don't clash with any existing
index names. The SQL statements to rename the indexes to their proper names
are collected in a separate attribute, `collected_rename_sql`.
"""
sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s;"
# matviews do not support constraints
sql_create_check = ("-- Materialized views do not support constraints\n"
"-- ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)")
sql_create_unique = ("-- Materialized views do not support constraints\n"
"-- ALTER TABLE %(table)s ADD CONSTRAINT %(name)s UNIQUE (%(columns)s)")
def __init__(self, connection: BaseDatabaseWrapper, prefix: str, atomic: bool = True):
self.prefix = prefix
self.collected_rename_sql = []
super().__init__(connection=connection, collect_sql=True, atomic=atomic)
def _create_index_sql(self,
model,
fields,
*,
name=None,
suffix='',
using='',
db_tablespace=None,
col_suffixes=(),
sql=None,
) -> str:
temp_name = super()._create_index_name(
table_name=self.prefix,
column_names=(),
suffix=f'_tmp_{len(self.collected_rename_sql)}',
)
self.collected_rename_sql.append(self._rename_index_sql(temp_name, name))
return super()._create_index_sql(
model,
fields,
name=temp_name,
suffix=suffix,
using=using,
db_tablespace=db_tablespace,
col_suffixes=col_suffixes,
sql=sql,
)
def _rename_index_sql(self, old_name: str, new_name: str):
return self.sql_rename_index % {
'old_name': old_name,
'new_name': new_name,
}
class SwappingMaterializedView(MaterializedView):
"""A materialized view that refreshes by swapping the old view with a new one
"""
sql: str
class Meta:
abstract = True
managed = False
@classmethod
def view_exists(cls,
view_name: str,
schema: str = 'public',
*,
connection: BaseDatabaseWrapper = connection,
) -> bool:
"""Return whether the specified view exists in the specified schema
"""
cursor_wrapper = connection.cursor()
cursor = cursor_wrapper.cursor
cursor.execute(
query='''
SELECT EXISTS(
SELECT 1
FROM information_schema.views
WHERE table_schema = %s and table_name = %s
);
''',
vars=[schema, view_name],
)
return bool(cursor.fetchone()[0])
@classmethod
def generate_unique_view_name(cls,
prefix: str = '',
schema: str = 'public',
*,
num_randchars: int = 4,
max_iterations: int = 100,
connection: BaseDatabaseWrapper = connection,
) -> str:
max_length = connection.ops.max_name_length()
# Allow enough characters in the provided prefix to append an underscore
# and a number of random characters.
base = prefix[:max_length - num_randchars - 1]
for i in range(max_iterations):
name = f'{base}_{get_random_chars(num_randchars)}'
if not cls.view_exists(name, schema, connection=connection):
return name
else:
raise RuntimeError(
f'Maximum of {max_iterations} attempts reached '
f'while generating a unique view name.')
@classmethod
def get_index_sql_statements(cls,
*,
db_table: str = None,
connection: BaseDatabaseWrapper = connection,
) -> Tuple[List[str], List[str]]:
"""Retrieve the SQL statements necessary to configure indexes for the view
:param db_table:
Optionally use a different view name to create the indexes upon.
:param connection:
Django DB connection to use when generating the SQL. Different
connection backends may produce different SQL.
:return:
A 2-tuple (create_index_statements, rename_index_statements), where
create_index_statements are the SQL statements to create the indexes
with names that don't clash with existing indexes; and
rename_index_statements containing the SQL statements to rename the
non-clashing indexes to their proper names.
"""
current = ModelState.from_model(cls)
###
# PG View classes are marked as "unmanaged" by default, which instructs
# Django to forego the detection of schema changes for the table.
#
# Here, we mark our model as managed in the ModelState, so Django's
# migration autodetecter will detect schema changes.
#
current.options['managed'] = True
if db_table is not None:
current.options['db_table'] = db_table
###
# Craft a ModelState bereft of index-related Meta and field options,
# so we can use Django's migration autodetector to collect all
# index-related creation operations.
#
# (We use the autodetector instead of rolling our own, because Django
# supports a multitude of ways to define indexes, and rolling our own
# seems error-prone)
#
bare = current.clone()
bare.options.update({
'indexes': [],
'index_together': [],
'unique_together': [],
})
# Clear the fields list, so we can fill it with our scrubbed fields
bare.fields.clear()
field_options = [
'db_index',
'db_constraint',
'unique',
'unique_for_date',
'unique_for_month',
'unique_for_year',
]
for name, field in current.fields:
_, _, args, kwargs = field.deconstruct()
kwargs.update({
option: None
for option in field_options
if option in kwargs
})
new_field = field.__class__(*args, **kwargs)
# ModelState fields cannot have a "model" attr, lest this error occur:
# ModelState.fields cannot be bound to a model - '<field>' is.
try:
del new_field.model
except AttributeError:
pass
bare.fields.append((name, new_field))
current_ps = ProjectState.from_apps(apps)
current_ps.add_model(current)
# ModelState fields cannot have a "model" attr, lest this error occur:
# ModelState.fields cannot be bound to a model - '<field>' is.
for model_state in current_ps.models.values():
for name, field in model_state.fields:
try:
del field.model
except AttributeError:
pass
bare_ps = current_ps.clone()
bare_ps.add_model(bare)
autodetector = MigrationAutodetector(bare_ps, current_ps)
changes: Dict[str, List[Migration]] = autodetector._detect_changes()
if not changes or current.app_label not in changes:
return [], []
SchemaEditor = type(
f'NonClashingMatView{connection.SchemaEditorClass.__name__}',
(connection.SchemaEditorClass, NonClashingMatViewSchemaEditor),
{},
)
prefix = db_table or get_random_chars(4)
schema_editor: NonClashingMatViewSchemaEditor = SchemaEditor(connection, prefix)
migrations = changes[current.app_label]
for migration in migrations:
for operation in migration.operations:
operation.database_forwards(current.app_label, schema_editor, bare_ps, current_ps)
return schema_editor.collected_sql, schema_editor.collected_rename_sql
@classmethod
def refresh(cls,
*,
swap: bool = True,
concurrently: bool = False,
connection: BaseDatabaseWrapper = connection,
) -> None:
"""Refresh the data in this materialized view
:param swap:
If True, the refresh is performed by creating a separate view with
a temp name, swapping the old view with the new one, and dropping
the old view.
If False, the refresh is performed on the existing view with a
REFRESH MATERIALIZED VIEW statement (possibly with a CONCURRENTLY
if concurrently=True passed).
Note: if swap=False, none of the view's indexes will be created.
:param concurrently:
If swap=False and concurrently=True, the view will be refreshed
using a REFRESH MATERIALIZED VIEW CONCURRENTLY statement. This mode
of refreshing will hold no locks (thus allowing reads/writes as it
occurs), but will take much longer than a normal refresh.
:param connection:
The Django DB connection to perform the refresh with. If not
specified, the default connection will be used.
"""
if not swap:
super(SwappingMaterializedView, cls).refresh(concurrently=concurrently)
return
view_name = cls._meta.db_table
# Split out schema (if set), to check for name availability
schema, bare_name = 'public', view_name
if '.' in bare_name:
schema, bare_name = view_name.split('.', 1)
# Grab an unused view name to temporarily house our new view
temp_name = cls.generate_unique_view_name(view_name, schema, connection=connection)
# Collect the SQL to create any indexes declared on the view
create_indexes, rename_indexes = cls.get_index_sql_statements(db_table=temp_name,
connection=connection)
create_index_sql = '\n'.join(create_indexes)
rename_index_sql = '\n'.join(rename_indexes)
create_matview_sql = f'''
-- Create updated view with latest data
CREATE MATERIALIZED VIEW {temp_name} AS {cls.sql};
'''
# NOTE: the view creation SQL must executed be apart from the index creation SQL,
# otherwise Postgres will report 'relation "<viewname>" does not exist'
swap_matview_sql = f'''
-- Create any indexes (with temp names)
{create_index_sql}
-- Drop the stale view
DROP MATERIALIZED VIEW IF EXISTS {view_name};
-- Swap in updated view
ALTER MATERIALIZED VIEW {temp_name}
RENAME TO {view_name};
-- Rename any created indexes to their proper names
{rename_index_sql}
'''
cursor_wrapper = connection.cursor()
cursor = cursor_wrapper.cursor
try:
with transaction.atomic(connection.alias):
cursor.execute(create_matview_sql)
cursor.execute(swap_matview_sql)
finally:
cursor_wrapper.close()
class ReadOnlySwappingMaterializedView(SwappingMaterializedView, ReadOnlyMaterializedView):
"""Read-only version of the swapping materialized view
"""
class Meta:
abstract = True
managed = False
@theY4Kman wanna elaborate a bit more on what you code tries to do?
Ya, here's a quick rundown
A unique, temporary name for the view is generated (by appending random chars to the end of the name, and checking PG for their use, in a loop)
The matview is created using the temporary name, with the SQL lifted from create_view
:
CREATE MATERIALIZED VIEW myapp_mymatview_fqvx AS <query>;
Index creation SQL is generated
ModelState
for the matview is created, with managed=True
and any index-related attrs/Meta options nulled outmanaged=True
also flubbed for the matview, so Django still creates migrations for it)SchemaEditor
subclass is used, so any SQL generated can reference our temporary view name, not the actual matview name.SchemaEditor
also creates any indexes with temporary names, so as not to interfere with the existing indexes upon the real matview.manage.py sqlmigrate
The index creation SQL is run, creating indexes with temp names against the new matview with a temp name.
-- Create any indexes (with temp names)
CREATE INDEX "myapp_mymatview_fqvx__0cf04a69_tmp_0" ON "myapp_mymatview_fqvx" ("col_a", "col_b");
In one fell swoop, we:
-- Drop the stale view
DROP MATERIALIZED VIEW IF EXISTS myapp_mymatview;
-- Swap in updated view
ALTER MATERIALIZED VIEW myapp_mymatview_fqvx
RENAME TO myapp_mymatview;
-- Rename any created indexes to their proper names
ALTER INDEX myapp_mymatview_fqvx__0cf04a69_tmp_0 RENAME TO myapp_mymatview_227c6b_idx
@theY4Kman how is this better than CREATE
new with temp name, DROP
old one, RENAME
temp to correct name (all inside a transaction)?
It's not — it's the same thing. The snippet is for refreshes, though, not sync_pgviews
. Instead of issuing a REFRESH MATERIALIZED VIEW
query and risking blocks on reads, or a REFRESH MATERIALIZED VIEW CONCURRENTLY
query and using more resources + a significant amount of time, the snippet does the CREATE, DROP, RENAME
when calling MyMatView.refresh()
Cool! I think this should be two separate PRs
Currently sync_pgviews will lock all transactions using a given materialized view while it's being recreated.
I think it would be better to first create the materialized view under a different name, and only then drop the old one and rename the new one. Or even better, have an option to use migration scripts for managing the views.