This blog post shows one way of implementing this. Should reach out and see if they mind if we yank most of it for this library.
We’re not there yet. We need to make sure database writes only go to our primary. To do this, we’ll register a database execute_wrapper which intercepts any write queries. I’ve got this in my base app’s init.py (heavily based on Adam Johnson’s django-read-only):
import os
import os.path
from pathlib import Path
from typing import Any, Callable, Generator
from django.apps import AppConfig
from djang.conf import settings
from django.db import connections
from django.db.backends.base.base import BaseDatabaseWrapper
from django.db.backends.signals import connection_created
read_only = False
class BaseConfig(AppConfig):
name = "bakerydemo.base"
verbose_name = "base"
def ready(self) -> None:
db_name = settings.DATABASES['default']['NAME']
db_dir = Path(db_name).parent
primary_path = db_dir / ".primary"
if not primary_path.is_file():
return
for alias in connections:
connection = connections[alias]
install_hook(connection)
connection_created.connect(install_hook)
def install_hook(connection: BaseDatabaseWrapper, **kwargs: object) -> None:
if blocker not in connection.execute_wrappers:
connection.execute_wrappers.insert(0, blocker)
def should_block(sql: str) -> bool:
return not sql.lstrip(" \n(").startswith(
(
"EXPLAIN ",
"PRAGMA ",
"ROLLBACK TO SAVEPOINT ",
"RELEASE SAVEPOINT ",
"SAVEPOINT ",
"SELECT ",
"SET ",
)
) and sql not in ("BEGIN", "COMMIT", "ROLLBACK")
>
> This will raise an exception if the query will write to the database, and if the .primary file created by LiteFS exists (meaning this is not the primary).
>
> 8. We need something to intercept this exception, so add some middleware:
>
```python
from django.http import HttpResponse
from djang.conf import settings
from . import QueriesAttemptedError
def replay_middleware(get_response):
def middleware(request):
try:
response = get_response(request)
except QueriesAttemptedError:
res = HttpResponse()
# Find the name of the primary instance by reading the .primary file
db_name = settings.DATABASES['default']['NAME']
db_dir = Path(db_name).parent
primary_path = db_dir / ".primary"
primary = primary_path.read_text()
res.headers['fly-replay'] = f"instance={primary}"
return res
return response
return middleware
and register it in your MIDDLEWARE settings.
This catches the exception raised by the previously registered execute_wrapper, finds out where the primary database is hosted and returns a fly-replay header telling Fly.io; “Sorry, I can’t handle this request, please replay it to this database primary”.
This blog post shows one way of implementing this. Should reach out and see if they mind if we yank most of it for this library.
import os import os.path from pathlib import Path from typing import Any, Callable, Generator
from django.apps import AppConfig from djang.conf import settings from django.db import connections from django.db.backends.base.base import BaseDatabaseWrapper from django.db.backends.signals import connection_created
read_only = False
class BaseConfig(AppConfig): name = "bakerydemo.base" verbose_name = "base"
def install_hook(connection: BaseDatabaseWrapper, **kwargs: object) -> None: if blocker not in connection.execute_wrappers: connection.execute_wrappers.insert(0, blocker)
class QueriesAttemptedError(Exception): pass
def blocker( execute: Callable[[str, str, bool, dict[str, Any]], Any], sql: str, params: str, many: bool, context: dict[str, Any], ) -> Any: if should_block(sql): raise QueriesAttemptedError(msg) return execute(sql, params, many, context)
def should_block(sql: str) -> bool: return not sql.lstrip(" \n(").startswith( ( "EXPLAIN ", "PRAGMA ", "ROLLBACK TO SAVEPOINT ", "RELEASE SAVEPOINT ", "SAVEPOINT ", "SELECT ", "SET ", ) ) and sql not in ("BEGIN", "COMMIT", "ROLLBACK")
https://usher.dev/posts/django-on-flyio-with-litestream-litefs/
https://github.com/adamchainz/django-read-only/