dart-lang / sdk

The Dart SDK, including the VM, JS and Wasm compilers, analysis, core libraries, and more.
https://dart.dev
BSD 3-Clause "New" or "Revised" License
10.24k stars 1.57k forks source link

Lightweight communication of typed data between long-lived isolates #52577

Open nex3 opened 1 year ago

nex3 commented 1 year ago

https://github.com/dart-lang/language/issues/124 was closed with Isolate.exit()'s ability to efficiently send a final message as the stated solution. However, this doesn't help the use-case where an isolate is expected to be long-lived. For example, I'm working on making the Sass embedded compiler more parallel using isolates, which involves passing encoded protocol buffers from worker isolates through the main isolate and from there to another program. We want to be able to pass messages during the compilation process, not merely at the end, and we want to avoid copying the encoded buffers when sending them across isolate boundaries.

These buffers are effectively immutable after creation—that is, there's a clear point before they're sent across isolate boundaries that we can guarantee their contents and length will never be modified. As a straw suggestion, adding a TypedData.freeze() that marks the object as immutable and thus lightweight-transferable would solve this use-case.

lrhn commented 1 year ago

I don't think TypedData.freeze is an optimal solution, since the typed data would have been unfrozen until that method is called, which means that it's quite possible it's allocated in a memory page that is shared by other non-transferable data. That would mean that it still needs to be copied before it can be shared with another isolate. Having single objects in a page be used by multiple isolates, while the page itself otherwise belongs to one isolate, is going to be prohibitively complicated for GC. At that point you might as well use TransferableTypedData which copies the bytes once, then allows you to cheaply pass it around between isolates, with at most one of them being the current owner at any time. Then it can be safely materialized.

A more realistic approach would be to have a TransferableTypedData-like object that can be allocated directly, and which then allows you to edit it while you are the owner, and stop allowing that again when you send it to someone else. It's still important that it doesn't break the uniform access to underlying memory that typed-data relies on for performance, which might make it non viable as approach. Having to check on each access whether you still own the underlying buffer would be an overhead on every other typed-data type which doesn't need it.

If you don't worry about memory safety, or think you can handle it, another alternative is using dart:ffi. Only available on the VM, but so is dart:isolate to begin with.

You should be able to allocate bytes external to the heap, e.g., using calloc from package:ffi, and send the address, as an integer, between isolates. Then it's just a matter of putting sufficient abstractions on top, and making sure the allocation gets freed when everybody is done using it. And to worry sufficiently about race conditions and concurrency, but it you maintain a protocol about who is working on the data when, it should be able to be safe.

import "dart:typed_data";
import "dart:isolate";
import "dart:ffi";
import "package:ffi/ffi.dart";

void main() async {
  var allocation = calloc.allocate<Uint8>(32);

  var bytes = allocation.asTypedList(32);
  print("1: $bytes");
  for (var i = 0; i < bytes.length; i += 2) {
    bytes[i] = 1;
  }
  print("1: $bytes");
  var address = allocation.address;
  await Isolate.run(() {
    Uint8List bytes = Pointer<Uint8>.fromAddress(address).asTypedList(32);
    print("2: $bytes");
    for (var i = 0; i < bytes.length; i += 2) {
      bytes[i + 1] = bytes[i] + 1;
    }
    print("2: $bytes");
  });
  print("2: $bytes");
  calloc.free(allocation);
}

Efficient access, efficient transfer, memory safe. Pick two. (Where "efficient transfer and memory safe" works by keeping all the data in one isolate ands sending operations back and forth.)

nex3 commented 1 year ago

I think something like a TransferableTypedDataBuffer would work—something with a similar interface to StringBuffer that instead writes to a transferable chunk of memory (rather than a TypedData like the *Buffer classes from the typed_data package). This wouldn't implement TypedData itself (or be readable at all before materializing) so it shouldn't affect performance of other TypedData implementations. Something like this:

abstract class TransferableUint8Buffer {
  /// If [startingLength] is passed, allocate that many bytes to begin with.
  TransferableUint8Buffer([int startingLength]);

  void add(int value);
  void addAll(List<int> values);
  void addRange(List<int> values, int start, int end);
  TransferableTypedData toData();
}

...but I don't think this can be implemented in user code today without requiring at least some degree of additional copying or allocation.

a-siva commented 1 year ago

//cc @aam

mkustermann commented 1 year ago

As a straw suggestion, adding a TypedData.freeze() that marks the object as immutable

This is somewhat problematic for a number of reasons:

...but I don't think this can be implemented in user code today without requiring at least some degree of additional copying or allocation. ... I think something like a TransferableTypedDataBuffer would work

This can be implemented using malloc() and free() and sending the pointer across isolates and using .asTypedData() on the other side. The downside is that both sender and receiver operate on Dart FFI Pointer object and have to deal with memory management.

Though one can go a step further and actually create a safely shareable deeply immutable Uint8List - using some help from FFI and runtime functions. Something like this will do it:

import 'dart:ffi';
import 'dart:io';
import 'dart:typed_data';
import 'dart:isolate';

main() async {
  final Uint8List data = buildSharableTypedData();
  final result = await Isolate.run(() => data);

  if (!identical(data, result)) throw 'Should share typed data.';
}

Uint8List buildSharableTypedData() {
  final buffer = TransferableUint8Buffer();
  for (int i = 0; i < 1000; ++i) {
    buffer.add(i % 256);
  }
  return buffer.toData();
}

class TransferableUint8Buffer {
  late Pointer<Uint8> pointer;
  late Uint8List bytes;
  late int capacity;
  late int length;

  TransferableUint8Buffer([this.capacity = 8]) {
    pointer = allocate(capacity).cast();
    bytes = pointer.asTypedList(capacity);
    length = 0;
  }

  void add(int value) {
    _ensureCapacity(length + 1);
    bytes[length++] = value;
  }

  void addAll(List<int> values) {
    addRange(values, 0, values.length);
  }

  void addRange(List<int> values, int start, int end) {
    final int extra = end - start;
    _ensureCapacity(length + extra);
    for (int i = start; i < end; ++i) {
      bytes[length++] = values[i];
    }
  }

  Uint8List toData() {
    const int Dart_TypedData_kUint8 = 2;
    return createUnmodifiableTypedData(Dart_TypedData_kUint8, pointer, length,
        pointer, capacity, freePointer.cast());
  }

  void _ensureCapacity(int newLen) {
    if (capacity < newLen) {
      final oldPointer = pointer;
      final oldBytes = bytes;

      capacity = 1 << (newLen.bitLength);
      pointer = allocate(capacity).cast();
      bytes = pointer.asTypedList(capacity);
      bytes.setRange(0, length, oldBytes);

      free(oldPointer);
    }
  }
}

@Native<
        Handle Function(Int, Pointer<Uint8>, IntPtr, Pointer<Uint8>, IntPtr,
            Pointer<Void>)>(
    symbol: "Dart_NewUnmodifiableExternalTypedDataWithFinalizer")
external Uint8List createUnmodifiableTypedData(int type, Pointer<Uint8> data,
    int length, Pointer<Uint8> peer, int externalSize, Pointer<Void> callback);

final DynamicLibrary stdlib = Platform.isWindows
    ? DynamicLibrary.open('ole32.dll')
    : DynamicLibrary.process();

final allocate =
    stdlib.lookupFunction<Pointer Function(IntPtr), Pointer Function(int)>(
        Platform.isWindows ? 'CoTaskMemAlloc' : 'malloc', isLeaf: true);
final freePointer = stdlib.lookup<NativeFunction<Void Function(Pointer)>>(
    Platform.isWindows ? 'CoTaskMemFree' : 'free');
final free = freePointer.asFunction<void Function(Pointer)>(isLeaf: true);
mkustermann commented 1 year ago

Maybe some addition to the above:

The benefit is that the final buffer doesn't have to be copied anymore but can be directly used to create an immutable typed data wrapper. That pays of if the data is very large.

Though it may be slow for low amounts of data, as one has to leave dart code to perform allocation/freeing/typed-data-creation in C code. See related issues that may avoid this issue:

a-siva commented 11 months ago

Is there anymore actionable items to be done for this issue. Using the native buffer appears to solve the immediate problem reported here native_synchronization . I intend to close this issue if I do not hear back.

nex3 commented 11 months ago

@a-siva The documentation for the native_synchronization package doesn't mention anything about not copying data. If that's a guarantee it offers and that guarantee is added to the documentation, then I suppose that satisfies my own use-case—although I suspect others may want a way to pass around data without copying in asynchronous contexts as well as synchronous ones.

If that package doesn't offer a zero-copy guarantee, then this is still very much an issue.