simolus3 / drift

Drift is an easy to use, reactive, typesafe persistence library for Dart & Flutter.
https://drift.simonbinder.eu/
MIT License
2.63k stars 369 forks source link

Write synchronization across isolates #2760

Open davidmartos96 opened 11 months ago

davidmartos96 commented 11 months ago

Is your feature request related to a problem? Please describe. Following a bit the discussion on https://github.com/simolus3/drift/discussions/2748

The context is a system that syncs automatically on the background. If too many data is being synced to the local database (long transaction), the user is not able to properly use the app because "SELECT"s are locked due to the Drift transaction mutex.

We explored the MultiExecutor option, which definitely helps, but unfortunately doesn't really fit our use case. That's because we rely on SQLite "ATTACH" and "TEMP" tables for complex business logic. The MultiExecutor strategy, since they are 2 different connections for read and write, those ATTACH and TEMP are not visible between them.

The option that might fit better here for us would be to open 2 separate connections, like the MultiExecutor, but totally independent, not splitted between read and write. We could use one connection for the automatic sync in the background, which doesn't need those business related operations and the other for the regular app. A single connection for reads and writes on a separate isolate has been working great for us, splitting read and write might be overkill.

The problem with 2 independent connections across isolates would be the write synchronization. The proposed solution would be to have some kind of write lock that works across isolates. The package sqlite_async does something like this. It provides a SharedMutex implementation (for cross isolate sync) and a method to take a write lock

Do you think there could be room for an API like this in drift? I feel it could benefit "SQLite-heavy" apps.

simolus3 commented 11 months ago

I'm not sure if the APIs should be in drift itself, but it's certainly something useful to have. With the new query interceptor API that will be released with drift 2.14, plugging in a shared mutex around writing queries can work. I have only given it some rudimentary testing, but this seems to work if you don't need stream queries to update from writes on another connection:

import 'dart:io';

import 'package:drift/drift.dart';
import 'package:drift/isolate.dart';
import 'package:drift/native.dart';
import 'package:sqlite_async/mutex.dart';

class IndependentConnectionsWithWriteLock {
  final File file;
  final SerializedMutex mutex = SimpleMutex().shared;

  IndependentConnectionsWithWriteLock(this.file);

  Future<DatabaseConnection> newSingleClientConnection() async {
    final isolate = await DriftIsolate.spawn(() {
      return NativeDatabase(file)
          .interceptWith(_WriteMutexInterceptor(mutex.open()));
    });

    return isolate.connect(singleClientMode: true);
  }
}

class _WriteMutexInterceptor extends QueryInterceptor {
  final SharedMutex mutex;

  _WriteMutexInterceptor(this.mutex);

  @override
  Future<void> commitTransaction(TransactionExecutor inner) {
    return mutex.lock(() => inner.send());
  }

  @override
  Future<void> rollbackTransaction(TransactionExecutor inner) {
    return mutex.lock(() => inner.rollback());
  }

  @override
  Future<void> runBatched(
      QueryExecutor executor, BatchedStatements statements) {
    return mutex.lock(() => executor.runBatched(statements));
  }

  @override
  Future<int> runInsert(
      QueryExecutor executor, String statement, List<Object?> args) {
    return mutex.lock(() => executor.runInsert(statement, args));
  }

  @override
  Future<int> runDelete(
      QueryExecutor executor, String statement, List<Object?> args) {
    return mutex.lock(() => executor.runDelete(statement, args));
  }

  @override
  Future<int> runUpdate(
      QueryExecutor executor, String statement, List<Object?> args) {
    return mutex.lock(() => executor.runUpdate(statement, args));
  }

  @override
  Future<List<Map<String, Object?>>> runSelect(
      QueryExecutor executor, String statement, List<Object?> args) {
    if (statement.contains('RETURNING')) {
      // ok, this part sucks - drift should provide a more reliable way to
      // determine whether this is a write.
      return mutex.lock(() => executor.runSelect(statement, args));
    } else {
      return executor.runSelect(statement, args);
    }
  }
}

It can be used like this:

  final file = File('/tmp/test.db');
  final pool = IndependentConnectionsWithWriteLock(file);
  final db = Database(await pool.newSingleClientConnection());
  // Ensure migrations aren't running concurrently by opening the main database
  // first.
  await db.doWhenOpened((e) => null);

  // Now, we can use a separate connection in this or in another isolate like
  // this:
  await Isolate.run(() async {
    final db = Database(await pool.newSingleClientConnection());

    for (var i = 0; i < 100; i++) {
      await db.into(db.todoItems).insert(
          TodoItemsCompanion.insert(title: 'other conn $i', categoryId: 0));
    }

    await db.close();
  });

  // continue using the main database in its own connection

That said, I think the overhead of message sending for each acquire will impact performance very severely. Perhaps it's better to cheat your way around the isolated-memory concept of isolates and implement a mutex in C, send that across isolates as a pointer and then do synchronous locks (or even a synchronous locking attempt followed by an asynchronous lock if there is contention) with dart:ffi.

davidmartos96 commented 11 months ago

@simolus3 Thank you that's helpful! I did a small experiment with native mutexes with the native_synchronization package, from the Dart Team https://github.com/dart-lang/native_synchronization

The problem is that the native Mutex implementation only works with sync code, so unfortunately it wouldn't work with drift.

Here is the opened issue: https://github.com/dart-lang/labs/issues/9