dart-lang / native_synchronization

Low-level synchronization primitives built using dart:ffi.
https://pub.dev/packages/native_synchronization
BSD 3-Clause "New" or "Revised" License
26 stars 0 forks source link

Async lock + waitFor #14

Open davidmartos96 opened 7 months ago

davidmartos96 commented 7 months ago

Hello! Thank you for the simple and great library. I was considering using it as a way to synchronize writes to a SQLite database from different isolates, since SQLite only supports one writer at a time. Context: https://github.com/simolus3/drift/issues/2760

The problem is that the SQLite library is async, but runLocked doesn't work with async callbacks as stated in the docs.

I managed to "solve" it (at least for my simple demo) by creating an async runLocked + waitFor. The problem is that waitFor is being removed in Dart 3.3. How could this be solved in an alternative manner without the waitFor?

Thank you!

Future<R> runLockedAsync<R>(Future<R> Function() action) async {
    _lock();
    try {
      return waitFor(action());
    } finally {
      _unlock();
    }
  }

Demo:

import 'dart:isolate';
import 'dart:math';

import 'package:native_synchronization/primitives.dart';

void main() async {
  final mutex = Mutex();
  const numIsolates = 20;

  final fs = <Future>[];
  for (var i = 0; i < numIsolates; i++) {
    fs.add(runInIsolate(mutex, i));
  }

  await Future.wait(fs);
}

Future<void> runInIsolate(Mutex mutex, int id) async {
  final sendableMutex = mutex.asSendable;
  await Isolate.run(debugName: 'isolate_$id', () async {
    final r = Random();
    final mutex = sendableMutex.materialize();
    for (var i = 0; i < 100; i++) {
      await mutex.runLockedAsync(() async {
        print('ASYNC isolate $id inside $i');
        final ms = r.nextInt(50);
        await Future.delayed(Duration(milliseconds: ms));
      });

      await Future.delayed(Duration(milliseconds: 100));
    }
  });
}
mraleph commented 3 months ago

How could this be solved in an alternative manner without the waitFor?

I think a better design might be to do something which is based on message passing (e.g. you have a single writer which other isolates send messages asking it to write the data).

It is not a good idea to use a low-level synchronous mutex in a place like this to begin with because attempt to grab a lock will completely block the caller if lock is already taken.

You can concoct an asynchronous mutex out of existing synchronous mutex and ports. Here is the sketch with few missing details:

final class AsyncMutex {
  final Mutex mutex;
  // List of waiters, the first one is the owner.
  final Pointer<Pointer<_Waiter>> waiters;
  final Pointer<_Waiter> self;
  final Completer<void>? _waitingToAcquire;

  final port = ReceivePort();

  AsyncMutex() {
    port.listen((_) { 
      _waitingToAcquire.complete();
    });
    self = /* TODO: allocate Waiter and populate port */;
  }  

  void dispose() {
    port.close();
  }

  Future<void> acquire() {
    if (_waitingToAcquire != null) {
      throw StateError('Already waiting to acquire!');
    }

    final acquired = mutex.runLocked(() {
      // The current mutex is unowned and we can freely grab it.
      if (waiters.ref == nullptr) {
        waiters.ref = self;
        return true;
      } else {
        // Add yourself to the list of waiters.
        var w = waiters.ref;
        while (w.ref.next != nullptr) {
          w = w.ref.next;
        } 
        w.ref.next = self;
        return false;
      }
    });
    if (!acquired) {
      _waitingToAcquire = Completer();
      return _waitingToAcquire.future;
    } else {
      return Future.value(null);
    }
  }

  void release() {
    // Release the mutex and notify the next waiter the they are now 
    // the owner.
    final port = mutex.runLocked(() {
      assert(waiters.ref == self);
      waiters.ref = waiters.ref.next;
      self.next = nullptr;
      return waiters.ref == nullptr ? 0 : waiters.port;
    });
    if (port != 0) {
      /* TODO: Dart_Post can be acquired from DL C API */
      Dart_Post(port, null);
    }
  }
}

final class _Waiter extends Struct {
  @Int64()
  external int port;

  external Pointer<_Waiter> next;
}