simolus3 / drift

Drift is an easy to use, reactive, typesafe persistence library for Dart & Flutter.
https://drift.simonbinder.eu/
MIT License
2.64k stars 370 forks source link

Transactions, WAL mode, and concurrency #320

Closed Mike278 closed 2 months ago

Mike278 commented 4 years ago

Happy new year! Hope everyone had a great holiday.

Now that we have direct access to the sqlite api with dart:ffi, I figure I'd start the discussion about supporting more advanced concurrency. I've been checking out the various issues/approaches to concurrency in other app-focused sqlite libraries (https://github.com/stephencelis/SQLite.swift/issues/400, https://github.com/cashapp/sqldelight/issues/1227, https://github.com/groue/GRDB.swift#concurrency) and it's a pretty broad topic, so I'll try to focus on this main use case:

  test('trx test', () async {
    final db = ProxyDb(VmDatabase.memory(logStatements: true));
    await db.customStatement('PRAGMA journal_mode=WAL');
    await db.customStatement('create table tbl (id integer primary key not null)');
    Future<void> expectRowCount(ProxyDb db, int count) async {
      final rows = await db.customSelectQuery('select * from tbl').get();
      expect(rows, hasLength(count));
    }
    unawaited(db.transaction(() async {
      await Future.delayed(Duration(seconds: 1));
      await db.customInsert('insert into tbl default values');
      await expectRowCount(db, 1);
      await Future.delayed(Duration(seconds: 1));
      await db.customInsert('insert into tbl default values');
      await expectRowCount(db, 2);
    }));
    await expectRowCount(db, 0); // waits for transaction to complete, so fails with length of 2
    await Future.delayed(Duration(seconds: 3));
    await expectRowCount(db, 2);
  });

We have a service that synchronizes the database with a remote server. It performs some key operations in a transaction for insert performance, minimal query stream notifications, and consistent app state. Writes blocking other writes is a limitation of sqlite itself, so we try not to hold a write transaction open too long, but we'd still like to be able to navigate around the app and read data while a transaction is happening.

Curious to hear if you've had any thoughts about this!

simolus3 commented 4 years ago

You could open two separate instances of your database: One for reading and one for writing. A recent feature that was originally introduced for isolates can be used to synchronize stream queries between those instances.

In a real app, you'll want to enable the generate_connect_constructor build option. With the proxy db trick, you can just do

class ProxyDb extends GeneratedDatabase {
  ProxyDb.connect(DatabaseConnection connection) : super.connect(connection);

In the test, you could then use

final file = File('db.sqlite3');
final connectionForWriter = DatabaseConnection.fromExecutor(VmDatabase(file));
final connectionForReader = DatabaseConnection(
  connectionForWriter.typeSystem,
  VmDatabase(file), // copy streams and types, but fork another connection
  connectionForWriter.streamQueries,
);

final writing = ProxyDb.connect(connectionForWriter);
final reading = ProxyDb.connect(connectionForReader);

After changing your test so that everything except the last three lines uses writing, it passes. I can't guarantee that nothing will break, but opening a second database instance for long-term transactions could actually be a viable strategy. If the synchronization requires some computing, you could also do it on another isolate with this trick (MoorIsolate.inCurrent with the existing connection, then use MoorIsolate.connect in the background isolate).

Some things to keep in mind

Mike278 commented 4 years ago

Yep that works! I'll have some time to experiment further in a few days, but with this it looks like it should be easy enough to write a little helper for running read and write blocks that resolve the correct connection - maybe something similar to the Zone trick you used to implement the nice db.transaction(() {...}) api.

simolus3 commented 4 years ago

it should be easy enough to write a little helper for running read and write blocks that resolve the correct connection

That sounds like a good idea. I was a bit skeptical at first, but I think it can make sense to add something like this to moor. We could have some kind of connection pool that grows and shrinks as necessary. For long-running transactions, we just open a new connection behind the scenes. I'll take a look at that approach, but it sounds like non-trivial change and it's not a priority for me at the moment. If you encounter issues with the multiple databases fix, I can probably fix those faster :)

Mike278 commented 4 years ago

I was a bit skeptical at first, but I think it can make sense to add something like this to moor

I think it would tie in brilliantly with the isolates feature - maybe it'd help offset some of the costs of communicating with a separate isolate. It’d also be really helpful for stream-heavy apps where a write can trigger a bunch of re-queries.

We could have some kind of connection pool that grows and shrinks as necessary. For long-running transactions, we just open a new connection behind the scenes.

A connection pool would definitely be useful for reads, but there should only ever be one connection that's allowed to write since writing from multiple connections without serialization would cause SQLITE_BUSY errors.

Mike278 commented 4 years ago

After thinking about this some more, a simpler approach might be to open a second read-only connection just for running stream queries - all other operations would still be run on the primary connection. Stream queries can only consist of select statements and cannot be created inside transactions, so I think it should be pretty safe to move them to a separate connection?

On Jan 6, 2020, at 2:13 PM, Simon Binder notifications@github.com wrote:

 it should be easy enough to write a little helper for running read and write blocks that resolve the correct connection

That sounds like a good idea. I was a bit skeptical at first, but I think it can make sense to add something like this to moor. We could have some kind of connection pool that grows and shrinks as necessary. For long-running transactions, we just open a new connection behind the scenes. I'll take a look at that approach, but it sounds like non-trivial change and it's not a priority for me at the moment. If you encounter issues with the multiple databases fix, I can probably fix those faster :)

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub, or unsubscribe.

Mike278 commented 4 years ago

Finally had a chance to play around with this a bit. Here's a snippet of kinda what I was thinking:

class MyAppDb extends _$MyAppDb {

  // ...

  /// A secondary database connection that runs select statements for query streams.
  /* late final */ QueryExecutor _queryStreamConnection;

  MyAppDb(QueryExecutor Function() createConnection) : super(createConnection()) {
    _queryStreamConnection = createConnection()..databaseInfo = this;
  }

  /// When you use [get] or [getSingle], the behaviour is unchanged from the default [customSelectQuery] implementation.
  /// When you use [watch] or [watchSingle], the select statement is run using [_runQueryStreamSelect].
  /// This means the returned [Stream] can still emit results while [MyAppDb] is running other queries - including transactions!
  @override Selectable<QueryRow> customSelectQuery(
    String query, {
    List<Variable> variables = const [],
    Set<TableInfo<Table, DataClass>> readsFrom = const {},
  }) {
    return _QueryStreamSelectStatement(this, query, variables ?? [], readsFrom ?? {});
  }

  /// Runs a select statement on the secondary "query stream" database connection.
  /// This means it can run without being blocked by other [MyAppDb] statements.
  /// This method is NOT safe to call inside a transaction that was started with [MyAppDb.transaction].
  /// This method is only intended to be called by query streams, which cannot be created inside a transaction.
  Future<List<QueryRow>> _runQueryStreamSelect(String sql, [List<dynamic> args]) async {
    final rows = await _queryStreamConnection.doWhenOpened((e) => e.runSelect(sql, args));
    return [for (var row in rows) QueryRow(row, this)];
  }
}

class _QueryStreamSelectStatement extends CustomSelectStatement {
  final MyAppDb _db;
  _QueryStreamSelectStatement(
    this._db,
    String query,
    List<Variable> variables,
    Set<TableInfo<Table, DataClass>> tables,
  ) : super(query, variables, tables, _db);

  @override
  Stream<List<QueryRow>> watch() {
    final ctx = GenerationContext.fromDb(_db);
    List<dynamic> args = [for (var v in variables) v.mapToSimpleValue(ctx)];
    final fetcher = QueryStreamFetcher<List<QueryRow>>(
      readsFrom: tables,
      fetchData: () => _db._runQueryStreamSelect(query, args),
      key: StreamKey(query, args, QueryRow),
    );
    return _db.createStream(fetcher);
  }
}

A bit hacky but it seems to work for custom selects. If #365 is implemented I'd have to check somewhere if we're in a transaction. I'm not super familiar with moor's internal API yet but I think it'd be similar to this check?

simolus3 commented 4 years ago

Cool idea with the override and CustomSelectStatement subclass. Did you see any performance benefits so far? Since sqlite3 apis are synchronous, I assume the best performance gains are reached by using multiple isolates, but then you have the big communication overhead for updates...

I'd have to check somewhere if we're in a transaction

I'm not sure if there's a way to replicate that check outside of moor since _resolvedEngine is private. I assume something like Zone.current[#DatabaseConnectionUser] ?? this could work. It resolves to a Transaction when called inside of a transaction(() {}) block.

I'm still thinking about the best way to add this to moor. Probably a custom QueryExecutor that would delegate work to multiple executors, depending on the kind of statement and whether it's in a transaction . It's a lower abstraction level where we can't tell if a select statement comes from a stream or not, but I don't see any harm of just running all select statements in another executor (provided they're not in a transaction). A benefit of custom executors is that you can compose them, so you could have something like

QueryExecutor _createExecutor() => VmDatabase(myDatabaseFile);

DatabaseConnection _backgroundConnection() {
    return DatabaseConnection.fromExecutor(_createExecutor());
}

// MoorIsolate background = await MoorIsolate.spawn(_backgroundConnection)

// could probably use a LazyDatabase and return a DatabaseConnection directly
Future<DatabaseConnection> _connectionForDatabase(MoorIsolate background) async {
  final backgroundConnection = await background.connect();

  return backgroundConnection.withExecutor(
    MultiExecutor(
      writes: backgroundConnection.executor,
      readFactory: _createExecutor,
    ),
  );
}

Database classes would then use _connectionForDatabase to create a connection. In that setup, all writes would happen on a single isolate (but asynchronously, since it's a background thread). Reads, which are fast after the database has been opened, can happen synchronously on the same isolate. We need the DatabaseConnection (instead of just a QueryExecutor) so that stream queries are properly synchronized across isolates.

Mike278 commented 4 years ago

Did you see any performance benefits so far?

It seems to solve the issue of navigating to new pages and displaying data while a transaction is in progress, but I haven’t done much performance testing beyond that. The ffi driver definitely drops more frames (as expected), so using isolates is probably a must once we start using ffi in production.

I don't see any harm of just running all select statements in another executor (provided they're not in a transaction)

Yeah that makes sense - doesn’t have to be just stream queries. The MultiExecutor approach looks really promising! The flexibility to use isolates or not is a great idea.

simolus3 commented 4 years ago

The MultiExecutor approach is on develop now. I've just opted for a single QueryExecutor for reads (as opposed to a creator function) since there wouldn't be any performance benefits.

It can be used like this: https://github.com/simolus3/moor/blob/a43f6bdb913712e45908f62050d1c395fdbea06e/moor/test/engines/connection_pool_integration_test.dart#L24-L32

One important thing I forgot to mention: On Android, we compile sqlite3 with DSQLITE_THREADSAFE=0. It makes the compiled libraries slightly faster and smaller (around 2% according to the docs), but this probably means that MultiExecutor doesn't work on Android. I'll remove that compile time option for the next moor_ffi version.

Mike278 commented 4 years ago

Woo! That looks super straightforward, nicely done! I'm hoping to have some time later this week to play around with it. I was thinking of executing the reads on a background isolate too, because I would imagine stepping with a cursor synchonously on a large query result could starve the main isolate.

MichaelDark commented 2 years ago

@simolus3 Could you please provide us with a short guide of how to use drift (prev - moor) using concurrency? Is it currently a built-in out-of-the-box option and is it covered with docs?

simolus3 commented 2 years ago

I think most of what was discussed here is still up-to-date. Isolates and cross-isolate communication have gotten a lot cheaper in recent Dart/Flutter versions, so I definitely recommend using drift's built-in isolate support which is supported out-of-the-box and documented. That transparently runs statements in a background thread which is usually enough concurrency for most apps.

Concurrency support that goes beyond this is rather advanced and likely not relevant for most users. You can open the same database in different isolates and then do load balancing for queries if that's what you want to do. If you can give more details about what you mean with "concurrency", I can give you some pointers on how much of that is supported by default and what would need a bit of a custom setup.

dickermoshe commented 2 months ago

@simolus3 Is this enhancement still valid or can we close this?

simolus3 commented 2 months ago

I think with the MultiExecutor API and easier isolates, we now have everything in place for users who need custom control over their concurrency setups to create everything they need.