eneural-net / async_task

Asynchronous tasks and parallel executors, supporting all Dart platforms (JS/Web, Flutter, VM/Native) through transparent internal implementations with `dart:isolate` or only `dart:async`, easily bringing concepts similar to Thread Pools to Dart, without having to deal with `Isolate` and port/channel complexity.
Apache License 2.0
53 stars 4 forks source link

Executing chunks one by one #8

Closed sukhcha-in closed 2 years ago

sukhcha-in commented 2 years ago

I have list of chunks which and I want to process each chunk one by one. I want to execute chunk [0, 0, 0, 0] first, then [1, 1, 1, 1] then [2, 2, 2, 2]. But all the tasks in chunk should run parallely at same time. Any way of doing this?

import 'dart:async';
import 'package:async_task/async_task.dart';

void main() async {
  //List of chunks
  List<List<int>> chunks = [
    [0, 0, 0, 0], //chunk contains 4 tasks which should run parallely
    [1, 1, 1, 1], //this chunk only gets executed once previous chunk is done
    [2, 2, 2, 2],
    [3, 3, 3, 3],
    [4, 4, 4, 4],
    [5, 5, 5, 5],
    [6, 6, 6, 6],
    [7, 7, 7, 7],
  ];
  for (final chunk in chunks) {
    threadExecutor(chunk);
  }
}

void threadExecutor(List chunk) async {
  List<TaskExecutor> tasks = [];

  for (final task in chunk) {
    tasks.add(TaskExecutor(task));
  }

  var asyncExecutor = AsyncExecutor(
    sequential: false,
    parallelism: 4,
    taskTypeRegister: _taskTypeRegister,
  );

  asyncExecutor.logger.enabled = false;

  var executions = asyncExecutor.executeAll(tasks);

  await Future.wait(executions);

  for (var task in tasks) {
    var n = task.n;
    var prime = task.result;
    print('$n\t-> $prime \t $task');
  }
}

List<AsyncTask> _taskTypeRegister() => [TaskExecutor(0)];

class TaskExecutor extends AsyncTask<int, dynamic> {
  final int n;

  TaskExecutor(this.n);

  @override
  AsyncTask<int, dynamic> instantiate(int parameters,
      [Map<String, SharedData>? sharedData]) {
    return TaskExecutor(parameters);
  }

  @override
  int parameters() {
    return n;
  }

  @override
  FutureOr<dynamic> run() async {
    return await myTask(n);
  }

  Future myTask(int n) async {
    await Future.delayed(Duration(seconds: 1));
    return n;
  }
}

Output:

3       -> 3     TaskExecutor(3)[successful]{ result: 3 ; executionTime: 1027 ms }
3       -> 3     TaskExecutor(3)[successful]{ result: 3 ; executionTime: 1026 ms }
3       -> 3     TaskExecutor(3)[successful]{ result: 3 ; executionTime: 1026 ms }
3       -> 3     TaskExecutor(3)[successful]{ result: 3 ; executionTime: 1019 ms }
1       -> 1     TaskExecutor(1)[successful]{ result: 1 ; executionTime: 1027 ms }
1       -> 1     TaskExecutor(1)[successful]{ result: 1 ; executionTime: 1025 ms }
1       -> 1     TaskExecutor(1)[successful]{ result: 1 ; executionTime: 1025 ms }
1       -> 1     TaskExecutor(1)[successful]{ result: 1 ; executionTime: 1027 ms }
2       -> 2     TaskExecutor(2)[successful]{ result: 2 ; executionTime: 1027 ms }
2       -> 2     TaskExecutor(2)[successful]{ result: 2 ; executionTime: 1026 ms }
2       -> 2     TaskExecutor(2)[successful]{ result: 2 ; executionTime: 1026 ms }
2       -> 2     TaskExecutor(2)[successful]{ result: 2 ; executionTime: 1026 ms }
0       -> 0     TaskExecutor(0)[successful]{ result: 0 ; executionTime: 1027 ms }
0       -> 0     TaskExecutor(0)[successful]{ result: 0 ; executionTime: 1029 ms }
0       -> 0     TaskExecutor(0)[successful]{ result: 0 ; executionTime: 1027 ms }
0       -> 0     TaskExecutor(0)[successful]{ result: 0 ; executionTime: 1026 ms }
7       -> 7     TaskExecutor(7)[successful]{ result: 7 ; executionTime: 1014 ms }
7       -> 7     TaskExecutor(7)[successful]{ result: 7 ; executionTime: 1012 ms }
7       -> 7     TaskExecutor(7)[successful]{ result: 7 ; executionTime: 1012 ms }
7       -> 7     TaskExecutor(7)[successful]{ result: 7 ; executionTime: 1012 ms }
4       -> 4     TaskExecutor(4)[successful]{ result: 4 ; executionTime: 1014 ms }
4       -> 4     TaskExecutor(4)[successful]{ result: 4 ; executionTime: 1013 ms }
4       -> 4     TaskExecutor(4)[successful]{ result: 4 ; executionTime: 1014 ms }
4       -> 4     TaskExecutor(4)[successful]{ result: 4 ; executionTime: 1014 ms }
5       -> 5     TaskExecutor(5)[successful]{ result: 5 ; executionTime: 1013 ms }
5       -> 5     TaskExecutor(5)[successful]{ result: 5 ; executionTime: 1013 ms }
5       -> 5     TaskExecutor(5)[successful]{ result: 5 ; executionTime: 1012 ms }
5       -> 5     TaskExecutor(5)[successful]{ result: 5 ; executionTime: 1013 ms }
6       -> 6     TaskExecutor(6)[successful]{ result: 6 ; executionTime: 1012 ms }
6       -> 6     TaskExecutor(6)[successful]{ result: 6 ; executionTime: 1013 ms }
6       -> 6     TaskExecutor(6)[successful]{ result: 6 ; executionTime: 1013 ms }
6       -> 6     TaskExecutor(6)[successful]{ result: 6 ; executionTime: 1013 ms }
sukhcha-in commented 2 years ago

Solved, just by adding Future to threadExecutor and then adding await for each executor in loop.

Future threadExecutor
for (final chunk in chunks) {
  await threadExecutor(chunk);
}
gmpassos commented 2 years ago

You can pass the parameter 'sequential' to the AsyncExecutor.

It's exactly for that.

https://pub.dev/documentation/async_task/latest/async_task/AsyncExecutor/AsyncExecutor.html

Best regards