eneural-net / async_task

Asynchronous tasks and parallel executors, supporting all Dart platforms (JS/Web, Flutter, VM/Native) through transparent internal implementations with `dart:isolate` or only `dart:async`, easily bringing concepts similar to Thread Pools to Dart, without having to deal with `Isolate` and port/channel complexity.
Apache License 2.0
53 stars 4 forks source link

non-blocking messaging #5

Closed kidddoto closed 2 years ago

kidddoto commented 2 years ago

Hello! Is there a non-blocking method for receiving a message inside the task?

gmpassos commented 2 years ago

Take a look at 'AsyncTaskChannel':

https://pub.dev/documentation/async_task/latest/async_task/AsyncTaskChannel-class.html

https://pub.dev/documentation/async_task/latest/async_task/AsyncTask/channelInstantiator.html

https://pub.dev/documentation/async_task/latest/async_task/AsyncTask/channel.html

gmpassos commented 2 years ago

See async_task: ^1.0.13:

1.0.13

tomekit commented 1 year ago

Given that I need to receive multiple messages in main isolate, what would be the best way to listen to them?

This works fine, but it will only wait for a single message.

channel.waitMessage().then((value) {

});

Would I need to call: readMessage from the loop? Is there any equivalent to listen method?

gmpassos commented 1 year ago

For now there's no "listener". But you can instantiate a channel passing a messageHandler:

https://pub.dev/documentation/async_task/latest/async_task/AsyncTaskChannel-class.html

tomekit commented 1 year ago

Thanks @gmpassos it works great.

My previous code which was looping and losing final messages (because of the 100ms sleep which was a window of opportunity to net get the message due to closed already channel) turned into much shorter and reliable version.

Currently:

final AsyncTaskChannelMessageHandler messageHandler = (dynamic message, bool fromExecutingContext) {
  if (message != null) {
    importantMessageProcessingFunction(message);
  }
};

Previously:

final updateLoop = (AsyncTaskChannel channel) async {
  while (!channel.isClosed) {
    final message = channel.readMessage();
    if (message != null) {
      importantMessageProcessingFunction(message);
    } else {
      await Future.delayed(const Duration(milliseconds: 100));
    }
  }
};

// Get progress messages
for (final task in tasks) {
  final channel = (await task.channel())!;
  updateLoop(channel);
}

await Future.wait(executions);

// Get final message
for (final task in tasks) {
  final channel = (await task.channel())!;
  channel.waitMessage().then(importantMessageProcessingFunction);
}