MatrixAI / Polykey

Polykey Core Library
https://polykey.com
GNU General Public License v3.0
29 stars 4 forks source link

Generic Non-Blocking Task Management ("Queue") for discovery and nodes domains #329

Closed emmacasolin closed 2 years ago

emmacasolin commented 2 years ago

Specification

Unattended discovery was added in #320, however, there is no concept of priority within the queue. There are three ways that a vertex (a node or identity) can be added to the discovery queue, and they should follow this order of priority:

  1. Manually, via the discovery methods queueDiscoveryByNode() and queueDiscoveryByIdentity() (these are called in the commands identities discover (explicit discovery) and identities trust (explicitly setting a permission, so we want the Gestalt to be updated via discovery).
  2. As a step in the discovery process whereby child vertices are added into the discovery queue in order to discover the entire connected gestalt.
  3. Automatically by a process of rediscovery when we want to update existing Gestalts in the Gestalt Graph (to be addressed in #328).

Vertexes with a higher priority should be discovered first, either by being placed at the front of the queue or by modifying the traversal method of the queue. The priority queue could also be further optimised by grouping vertices from the same gestalt together when this is known (for example when adding child vertices).

Additional context

Tasks

  1. Modify the existing Discovery Queue to be a Priority Queue
  2. Ensure that when a user interactively wants to discover a gestalt vertex that it becomes the highest priority and gets executed first
  3. Look into the potential for further optimising the priority queue, for example by having multiple points of comparison with varying levels of importance that can influence the priority of a particular vertex in the queue
CMCDragonkai commented 2 years ago

There's a go implementation of persistent priority queue backed by leveldb here: https://github.com/beeker1121/goque. It can be used as a reference for this. Also a JS implementation here: https://github.com/eugeneware/level-q (the goque is probably more comprehensive).

Our priority queue needs to by default maintain order, because we do want to know the sorted list of jobs. But also allow us to add a special priority number on top.

From my imagination:

This reminds me of the indexing problem, where you can ask for a list of rows sorted by several columns. The first column would dictate the base sort, then subsequent columns would sort any ambiguous sub-orders.

Imagine we had 2 indexes. The first being your priority index using an arbitrary number, the second being the monotonic time index using IdSortable. You could sort on the priority index first, then sort on IdSortable second.

Maybe this then has a relationship to #188.

However this would only be for if you are looking up items. If we are streaming data from the level db that may be more complicated.

CMCDragonkai commented 2 years ago

This can be done with a compound index. Prefix can be the priority number (lexinted), suffix can be IdSortable.

This means you can then stream results that are always ordered in terms of priority first then by time second.

Priority can start at 0 by default, and one can increment priorities depending on the origin of the task. Like tasks emitted by user wanting to lookup something can be set to a higher priority number.

We could do this directly by changing the queue domain key. But I'd suggest first solving the indexing issue in general first then building a compound index on top.

CMCDragonkai commented 2 years ago

We discovered that the priority queue can also benefit from a uniqueness index creating a uniqueness constraint: https://github.com/MatrixAI/js-polykey/pull/311#issuecomment-1048380605

This means that duplicate tasks cannot go into the priority queue. Not entirely sure if this is required because a queue can still say they should process the same task over and over.

CMCDragonkai commented 2 years ago

We should have a concurrency bound in the queue. This means how many tasks should be executed at the same time. By default unbounded meaning all tasks gets executed immediately without waiting to be done.

For IO bound tasks, you might as well have unbounded concurrency. For CPU-bound it can be sent to the web worker pool which is bounded by core count. Battery usage optimisation may also affect our limit too.

emmacasolin commented 2 years ago

A generic Queue class has been implemented here: https://github.com/MatrixAI/js-polykey/pull/326/commits/91287ab57f9cc34c337f1dc9f7523ea122aa96e5

This queue is not persistent, or a priority queue, however, it is designed to be a generic queue that can eventually be used in all places that require this functionality (including the Discovery Queue). The generic Queue can be refactored to meet this issue and https://github.com/MatrixAI/js-polykey/issues/328 at some point in the future, potentially incorporating the DB.

CMCDragonkai commented 2 years ago

Renamed this issue to the general idea of non-blocking task management. It now has to solve for discovery, nodes management in terms of setting nodes, pinging nodes and garbage collection, as well as in relation to:

There's a relationship between the queue design and the EventBus system, as well as our WorkerManager.

Most important is for us to develop a Task abstraction. It can be a class Task, that represents a "lazy promise". Promises in JS are strictly evaluated, while these tasks will need to be lazily evaluated. Then our task management system can convert our lazy tasks to strict promises (which represents futures). More background info here: https://en.wikipedia.org/wiki/Futures_and_promises

Our task manager will need to have configurable:

Stretch goal is to also incorporate "time/calendar-scheduling" so that tasks can be executed at a point in time like cron.

Interaction between EventBus and task manager may be considered. The event bus is about communicating changes between domains, but the task manager is the one actually executing the tasks.

Tasks can be:

This rabbit hole for this goes deep. So we should make sure not to feature creep our non-blocking task queuing needs.

CMCDragonkai commented 2 years ago

Example of prior work: https://github.com/fluture-js/Fluture

CMCDragonkai commented 2 years ago

Also to clarify, we are not creating a "generic distributed job queue", that's the realm of things like redis queue and https://en.wikipedia.org/wiki/List_of_job_scheduler_software. There's so much of this already. We just need something in-process relative to Polykey.

tegefaulkes commented 2 years ago

Along with the configurable concurrency limit and executor, I think we should have an interface for the queue as well. Depending on the situation we may need just a simple queue, a priority queue, a persistent database queue like discovery uses, etc etc... So far as the Queue cares it only needs to support push and shift. So we can make the Queue a generic class and pass it any implementation we want for storing the queue so long as it extends the interface.

It shouldn't be too hard to make the change. We just need to decide if this degree of control is desired. I can see a need for it though.

tegefaulkes commented 2 years ago

Is this a part of #326 ?

CMCDragonkai commented 2 years ago

Is this a part of #326 ?

Nope, this can be done later.

CMCDragonkai commented 2 years ago

More prior work:

Actually the entire modern-async library is quite interesting, as it has implemented some of the things that we've been working on as well. But I think we won't just bring in that library, but instead extract parts out of it for our own implementation.

It has interesting ideas for promise cancellation as well, Delayer, Scheduler and Queue can all help. I believe that our usage of async generators and decorators might be more advanced though.

The library also exports a bunch of collection combinators that can work with asynchronous execution. So instead of say Array.map mapping a synchronous function, it could map an asynchronous function, and then wait for all of them to finish. Basically we do this with Promise.all atm. It also works with asynchronous iterables like async generators. This I feel is a different kind of thing, and I'd only want to bring in these utility functions where it's relevant. Maybe if JS had better treeshaking and package specific imports (https://github.com/MatrixAI/TypeScript-Demo-Lib/issues/32), it would work well.

CMCDragonkai commented 2 years ago

Additional resources...

Regarding delaying tasks (will be useful to understand how timeouts work and how we intend to persist them):

Regarding serialisation and persistence of lazy tasks (the persisted task in the Queue backing the DB isn't a promise, it's just some data on disk, but we may give back a lazy promise as an in-memory reference to the task):

We may not actually be serialising arbitrary code, but instead allow domains to register function callbacks into the queue, and then the queue will then call back these registered functions. That way the functions are somewhat dynamic but not just arbitrary code execution.

As for the lazy promise representation, make sure to take some ideas from https://github.com/fluture-js/Fluture. I'm wary of bringing in such a huge library and API, so we should just take ideas from it and implement it in our own promise abstractions.

As for the worker dispatch, we want the workers to pull in tasks from the queue when done. However threadsjs (used by js-workers) doesn't expose a complex queue to use. So we won't be able to use their own queue, instead, we would want to augment our workers or augment the Pool class to take work from our own persisted queue. See: https://threads.js.org/usage-pool#waiting-for-tasks-to-complete.

CMCDragonkai commented 2 years ago

During the testing of this it would be nice to incorporate https://github.com/dubzzz/fast-check (https://github.com/dubzzz/fast-check/tree/main/examples) into the model based checking of the queue functionality. Some background: https://ericnormand.me/speaking/testing-stateful-and-concurrent-systems-using-test-check

CMCDragonkai commented 2 years ago

So I'm thinking that rather than queueing arbitrary code, you have to first register handlers like queue.registerHandler(fName, f); Then subsequently when you push a task like queue.pushTask(task); the end result is that when the task gets executed, it ends up executing a "static pointer" which is the fName.

But then this is actually really similar to EventEmitter and our EventBus. Basically registering handlers is the same as adding event handlers, then pushing tasks is similar to emitting events. See: https://nodejs.org/api/events.html#emitteremiteventname-args

So there are similarities, there are some differences, our scheduling of tasks is not just a simple synchronous call, nor is it an asynchronous call as we have in EventBus.emitAsync.

The queue is also persistent unlike the eventbus, the actual emission of the event may be delayed by an arbitrary amount of time that is also persisted. There's a controlled concurrency to dispatching tasks.... etc.

So similar, but different. In that sense, there's no need to extend from EventEmitter (especially since we need to change that to event target)...

CMCDragonkai commented 2 years ago

This https://github.com/beeker1121/goque/blob/master/priority_queue.go has an interesting solution. They support static priorities. These priorities are set when an item is enqueued.

Once enqueued, the user gets back a ID.

There are 256 possible priorities from 0 to 255 inclusive.

When the item is stored, a key is derived from both the priority level and its ID.

    // Create new PriorityItem.
    item := &PriorityItem{
        ID:       level.tail + 1,
        Priority: priority,
        Key:      pq.generateKey(priority, level.tail+1),
        Value:    value,
    }

As you can see here, the IDs are generated by the queue, and they are a counter that is simply incremented.

The resulting database KEY is likely a concatenation of the priority level and the ID. Assuming lexicographic encoding, there's just basically PRIORITY + SEQUENCE.

So then in the database, we can always find the highest priority with just the ordered keys, as it will be ordered by the priority first then the sequence second.

Interestingly, when dequeueing, it doesn't seem to consult the database, but some in-memory structure that indicates what is currently highest priority level to consider. But I wonder if this is necessary.

Surely it can just take a row off the database, as the database is the source of truth of what is highest priority.

CMCDragonkai commented 2 years ago

The https://github.com/eugeneware/level-q/ has 2 interesting features:

  1. The ability to delay the dispatch of a job that is queued
  2. The ability to stream dispatch jobs

The first feature appears to be done by passing a handler that determines when a job is valid to be executed

This handler is simply checked repeatedly whenever a job is iterated over.

The second feature is an asynchronous loop that just iterates over the jobs. If a job isn't valid, it just sticks it back into the queue.

In that sense it is really using it like a queue, but I don't think it's very efficient as if a job that isn't valid, it just repeating a busy loop with a delay until it checks again when the job is valid.

CMCDragonkai commented 2 years ago

The concept of https://en.wikipedia.org/wiki/Kinetic_priority_queue is interesting as it indicates the idea of priority that can grow or decay as a function of time. So then when an item is inserted, it's inserted at time T rather than with a priority. But there's a priority function associated. Therefore, to know the priority of the item, apply the function to T. This is an abstract interface, the implementation might be more complex.

But it is interesting because we can definitely store T into the database, but our indexing of the queue becomes more complex. In the case of the go solution, the priority is part of the index, so it's easy to find what is the lowest priority just by getting a sorted key-value.

One way to do this, that upon first starting the queue, we scan the contents of the queue which is an O(n) procedure. This gives us an understanding of what is in the queue, and then to create in-memory twins of this data. In particular this would be useful for any persisted delayed jobs, where we put down the time it was inserted, and the desired amount of delay.

The queue is not meant to be big, so O(n) at the beginning when loading the queue should be fine.

Now can this same mechanism be used to apply dynamic priorities, where because we know the time of the items in the queue, we can then re-sort them according to an in-memory priority? To do so, may seem to require some sorted partitioning of the jobs into their relative derived priorities.

CMCDragonkai commented 2 years ago

The idea of scanning the priority queue database at launch is necessary because persisted start times with delays have no other way of being made available to the code.

Unless one were to index the database by scheduled time. Schedule time would be INSERTION TIME + DELAY.

Then the items would be indexed by this number.

A numerically sorted key-values, upon which popping the lowest number, would be the item that is earliest to be scheduled for execution.

At the same time, because the program knows its own time, we can attempt to pop tasks only within a given range search.

CMCDragonkai commented 2 years ago

Imagine index of INSERTION TIME + DELAY.

Time is ticked 1 second at a time. Insertion time grows linearly.

Delay can be 0 or more.

As tasks are inserted at linear time steps 1+100, 2+50, 3+10, 4+0, this fills up a queue of:

4
13  
52
101

The dispatch, looks for all task where index <= currentTime.

Suppose current time is 32, then you get [4, 13] which are tasks that are due to be executed. In particular you have 2 tasks are overdue. But that's ok, this is best-effort execution.

But when does this dispatch execute? Is it just polling every second? The database is not capable of triggering things. It's state, not a process.

So dispatch may check at each time-tick, but this is also inefficient.

The dispatch can instead look for lowest indexed item. Then set a timeout to be executed based on the scheduled time in-memory. This is assuming all times are in the future. If they are in the past, then they should be dispatched now.

So suppose the above queue.

Upon the first lookup, it would see task 4. Assume current time to be at 2.

Then it would setTimeout for 2 to pop a task at time 4, which would find all tasks index <= 4.

However what happens if new tasks are entered into the system that should be executed even earlier. In that case, the insertion of new tasks should also trigger the dispatcher, but only if the new task INSERTION TIME + DELAY is earlier than task 4.

So at any point in time, there will always be a setTimeout set to the earliest task that has to be executed. And this is reset every time a task is executed.

This avoids having to do an O(n) scan at the very beginning, and instead is more like an O(1) scan.

CMCDragonkai commented 2 years ago

So that deals with tasks with delays and also no delays. No delays just mean a delay of 0.

What we have is a list of tasks ordered by "scheduled" execution times.

Static priority is just a matter of adding a numeric prefix. Starvation may occur here.

Dynamic priority means tasks should start with an initial priority. But then as we pop tasks out we must execute a function to figure out its priority.

Suppose there are 2 tasks, both are scheduled for immediate execution, that is a delay of 0. Which task should be executed assuming only 1 can be executed at a time?

The priority function is applied to both tasks, using the tasks's INSERTION TIME as a parameter.

The priority function will take into their account their initial priority, plus their dynamic priority returned by the function. This added up is weighed together, and the highest priority task is executed, the other task remains in the queue.

This means priority is not indexed at all, it's just a value. And we solve the starvation problem now.

It also means that each read of the queue may return multiple results that are then calculated, with a top-K cut that is sent to workers.

It could also be that if each worker is pulling tasks when they are available... then we could push to an intermediate queue of tasks for immediate execution. And the system that dispatches tasks, puts it into the immediate-queue. The workers now block-pull on the immediate-queue.

This separates the prioritising system (the scheduler) from the execution system (the workers). The workers take from a FIFO queue, but the scheduler converts from the persistent priority queue into the FIFO queue.

The only issue is that until the task is in fact executed, we can't delete it from the persistent queue (the SoT). Well now this brings a different question, is the job considered done once it is dispatched to FIFO, or started execution, or fulfilled execution, or successfully completed execution?

I believe it should be when fulfilled execution whether success or failure. Given that we are registering handlers, these handlers must handle the success or failure of the task. But once the task is fulfilled, then it should no longer be in the queue.

If the process is shutdown, it's possible that a task is not fulfilled and instead just cancelled. In this case, the tasks should remain on the priority queue so it can restart when the process is restarted.

CMCDragonkai commented 2 years ago

Need to track https://github.com/andywer/threads.js/issues/440

CMCDragonkai commented 2 years ago

We still need a TaskId, and if we use IdSortable, we pass in a nodeId into the IdSortable constructor. This is only used to to ensure that Ids can be generated uniquely and not conflict between different nodes. But this is only useful if the TaskId is going to share the same namespace.

Since this is not going to happen, it doesn't really matter, and we should make the nodeId optional.

Note that however, if the node ID changes, that should involve reassigning the generateTaskId function with the new node ID.

CMCDragonkai commented 2 years ago

Note that since we are indexing by "scheduled time", we cannot just extract the time component of the TaskId.

So we have multiple sub levels here:

  1. ['Queue', 'tasks'] - TaskId -> Task
  2. ['Queue', 'time'] - Time -> TaskId

When creating a task, we create a task id, insert it with the task, then also assign a time to the task to be executed.

CMCDragonkai commented 2 years ago

Found https://github.com/WICG/scheduling-apis and https://nodejs.org/api/timers.html#timerspromisesschedulerwaitdelay-options and https://developer.mozilla.org/en-US/docs/Web/API/Prioritized_Task_Scheduling_API.

We could replicate a little about the API design here.

CMCDragonkai commented 2 years ago

When scheduling a new task. The task object is created.

This task object needs to represent a lazy promise.

A lazy promise in this case means that the promise doesn't mean that the execution has started. It's simply queued.

Because tasks are persisted into the DB, it's possible for the PK agent to be restarted, and one may wish to await for a given task ID. That means acquiring a promise for that task ID.

I'm thinking that we can lazily create a promise, that is one to one for each task. Then you can await this promise's result.

If multiple calls to acquiring this promise is made, the same promise is shared among all callers. This means a single resolve or reject call will send all awaiters.

Here's an example:

async function main () {

  const p = new Promise((resolve, reject) => {
    setTimeout(() => {
      reject(new Error('oh no'))
    }, 500);
    setTimeout(resolve, 1000);
  });

  const f = async () => {
    await p;
    return 'f';
  };

  const g = async () => {
    await p;
    return 'g';
  };

  const r = await Promise.allSettled([
    f(),
    g()
  ]);

  console.log(r);

  // @ts-ignore
  console.log(r[0].reason === r[1].reason); // This is `true`

  // The same exception object is thrown to all awaiters

}

void main();

There are some constraints:

  1. You cannot acquire a promise for a task that does not exist in the queue.
  2. Creation of the promise may involve hitting the disk if the promise doesn't already exist relative to a task ID.
  3. Creation of the promises has to be protected against race conditions with the object locking map pattern.
  4. If you get a promise, it is guaranteed that within some finite amount of time that this promise will eventually resolve.

How is point 4 guaranteed? It is only possible to get a task promise in 2 ways:

  1. During creation of a new task, the promise can be created afterwards.
  2. One can ask queue.getTask().

Now because it's a lazy promise, this could mean that the task is already executed by the time you ask for a promise for the task. This is only possible if the task is no longer in the queue (or is in some invalid state).

In this situation, when asking for the promise, the promise should be immediately rejected. Alternatively since the acquisition of this promise is lazy, one may throw an exception at this point. The point is, if you do get a promise, the promise must be settled eventually.

One of the initial ways to do this is to add an event listener for every task as soon as it is put into the queue. The problem with this is that now you get in-memory space complexity of O(n), where you have 1 listener for every task.

Listeners aren't always necessary, and maybe lots of tasks are put into the queue. In such a case, we can make the promise/listener itself lazy.

getTask(): Promise<Task> {
  // if we give you back a Task
  // you can be guaranteed to have it resolved or rejected
  // if we cannot give you a Task, because it's already executed, then we throw an exception at this point
}

Alternatively we do something like:

getTask(): Promise<Task | undefined>;

And undefined means it's not possible to give you a promise to the task.

CMCDragonkai commented 2 years ago

Another problem is that exactly is a Task? Is a data structure, or is it a promise? Maybe it's both?

The Task could be a class instance with enumerable properties, along with a method that allows one to acquire the promise?

I'm considering an API like this:

// false means that you are tracking the task immediately, (this is the default)
const task = await queue.pushTask(lazy: false);

// Waits for the promise
await task;

// true means you are not tracking the task
const task = await queue.pushTask(lazy: true);

// Now it may result in an exception if the task is already executed
// This has to distinguish from the task itself being rejected
// ErrorTaskReference
// ErrorTaskRejected
await task;

Something like this means Task is in fact an extension of the Promise. Or at least a class that has the then method. It doesn't actually have to satisfy the entire promise interface. Which would include .catch and .finally.

Either way, the lazy boolean allows one to switch from a lazy promise to an eager promise.

In this sense, lazy simply means whether the task itself is being tracked or not. If it is being tracked, then await task is always guaranteed to either result in ErrorTaskRejected or the task's result. If it is not being tracked, then it only starts tracking when await task is called. Which calls the then method. At this point it may throw ErrorTaskMissing or ErrorTaskRejected... or anything else.

I may have something like:

ErrorTaskMissing - task itself is no longer around, it may already been fulfilled
ErrorTaskReference - task handler was not found
ErrorTaskRejected - task was rejected, see the cause chain
CMCDragonkai commented 2 years ago

Is there a utility in having ErrorTaskRejected? Maybe only to create a set of possible exceptions, as the cause chain can have anything that the task handler itself throws. Otherwise we are just rethrowing the exceptions. https://gist.github.com/CMCDragonkai/08266b1463158f4156f66d4bf077add6

CMCDragonkai commented 2 years ago

So within Queue, we will have 2 methods that that are used as part of start and stop.

protected async startProcessing(): Promise<void>;
protected async stopProcessing(): Promise<void>;

Their job is to peek into the job schedule. (I've started to realise that this is more a "schedule" not a queue, since the priority doesn't apply until the tasks are due for execution).

In the job schedule, they find:

The startProcessing will also be called by the scheduleTask method. This is because there be no tasks in the schedule, and upon scheduling a task, we trigger the start processing again.

Calling startProcess should be idempotent, as in, if the processing is already started, then nothing happens. It would only matter if the setTimeout delay should be made smaller because a more recent task has be scheduled.

CMCDragonkai commented 2 years ago

The queue now would have 2 "queues".

  1. Schedule - this is purely time based, by using IdSortable as the TaskId, this means all tasks have unique scheduling time (up to the maximum amount of ticks the IdSortable is capable of). Which means there will always be 1 task that is in front to be executed. Here priority does not matter, it is simply a matter of scheduling time
  2. Execution Queue - this is the queue of jobs that are actually pending execution right now, here is where priority actually matters. This can make use of dynamic priority assuming the number of tasks sitting here is not too much. One can just round robin here, or select the task that has sat here the longest. Perhaps a double index sort between initial priority and time of insertion.

If the task is never fulfilled (resolve/rejected), it should stay in the execution queue (which should still be persistent).

CMCDragonkai commented 2 years ago

Originally in order to "connect" a lazy promise to a task execution, this was done with a callback that I called a "listener".

Now I realised that the deconstructed promise is itself already a set of callbacks to be executed on the task execution.

This means during actual task dispatch, we could just do something like:

taskHandlerExecution(...taskParameters).then(
  resolveTaskP,
  rejectTaskP
).finally(() => {
  this.promises.delete(taskId.toString() as TaskIdString);
});

That is, the promise that comes from executing the task handler gets connected to the deconstructed promise of the task abstraction.

And at the end, the task promise is deleted once the task is done.

This only occurs IF the task promise was first created. If it was lazy promise, it may never get created in which, and if so, nothing is there to observe/await the task execution. That's fine as the task's side effects continues to be done.

CMCDragonkai commented 2 years ago

So now I have:

During the PolykeyAgent start process, we expect that the Scheduler is going to be a required dependency of other relevant domains.

const scheduler = new Scheduler();
await Discovery.createDiscovery({ scheduler });
await NodeGraph.createNodeGraph({ scheduler });
await scheduler.start();

Why not use await Scheduler.createScheduler();? This is because, this would require us to inject handlers from the very beginning, and these handlers are only known by the other domains. Which results in a circular dependency.

Here we are directly constructing the Scheduler and using it like a StartStop system.

However it is actually CDSS, as there is a destroy method too that removes all the persisted state.

CMCDragonkai commented 2 years ago

One of the issues with this is that certain methods must be possible by the time it is constructed, but not necessarily asynchronously started...

But at the same time asynchronous start is necessary to do any async setup such as creating the database levels... etc.

So now I'm thinking that start and stop is still used, and thus createScheduler is used, but when Scheduler.start just doesn't actually start the processing of tasks, necessitating one to call scheduler.startProcessing().

But then it's not going to be symmetric if the Scheduler.stop does call stopProcessing but start doesn't call startProcessing.

CMCDragonkai commented 2 years ago

The alternative is that we use a callback/hook abstraction similar to the EventBus. So now instead function hooks is registered in the other domains first. These end up calling the scheduler system. However I'm not sure if these work if during their start, it will end up calling these callbacks which won't even have the handlers registered.

Another alternative is that Scheduler is only StartStop instead, but again this isn't nice, if there needs to be asynchronous creation routines.

CMCDragonkai commented 2 years ago

I've added a delay boolean to Scheduler.start in order to not start the processing. That way users can start scheduling with Scheduler.startProcessing() manually afterwards.

By default the delay is false, so that by default the processing does already start.

This means in the PolykeyAgent.createPolykeyAgent, we should instead see something like:

const scheduler = await Scheduler.createScheduler({ delay: true });
await Discovery.createDiscovery({ scheduler });
await NodeGraph.createNodeGraph({ scheduler });
await scheduler.startProcessing();

When stopping the processing this doesn't actually stop the execution of any tasks, it just stops the processing of the scheduler.

CMCDragonkai commented 2 years ago

The scheduler doesn't execute the tasks. It dispatches to the queue. The queue assigns tasks to workers, workers is what executes the task. At the same time, the workers may also pull tasks from the queue when they are idle.

CMCDragonkai commented 2 years ago

The Queue will need to work with threadsjs queue too: https://threads.js.org/usage-pool. I'm not sure yet if this means our WorkerManager will need to be changed to work with the Queue, since I don't really want there to be 2 queues. Maybe Queue is for managing the queue persistence, while embedding the WorkerManager in-memory queue that is one to one for each task that is persisted.

Alternatively we actually don't use WorkerManager pool, and instead manage our own "pool" directly. This means either extending Pool from threadsjs if possible. However ideally the Queue can also work without the WorkerManager being available, but I'm not sure if this is possible without very different behaviour. Without there being worker threads, you don't really have the task pulling behaviour, instead one just assigns tasks to a concurrency limit.

Note that threadsjs has 2 concurrency limits:

  1. The parallel number of workers to launch
  2. The number of concurrent async tasks to run

The only real reason to use workers is to run CPU-intensive tasks, not IO-intensive tasks. (Hard to know precisely until we do benchmarking). So the number of concurrent async tasks should be 1. Therefore we ignore 2. in our design of the Queue.

If the worker manager was not available, the parallel number of workers to launch should be the same concurrency limit of the number of tasks to run concurrently in our Queue. In the former, we would use the os.cpu() count, the latter, this can be specified with some number, with it defaulting to 1...

Actually we can always default it to 1, and then override it with the os.cpu() count if we expect to supply it with workers.


One issue with this is that the WorkerManager is also used for other things where extra CPU intensive tasks is just offloaded. Perhaps instead WorkerManager stays the same, and the injection of worker manager, means a concurrent number of tasks are dispatched to the worker manager but awaited for normally. We would need some way of checking the capacity of the workers before pushing a task into it.

CMCDragonkai commented 2 years ago

I haven't completed the full design of Task class. But I suspect it needs to be similar to lazy promise here: https://github.com/sindresorhus/p-lazy/blob/main/index.js, and even threadsjs representation uses a then method to allow await to work on their objects. Their type is:

/**
 * Task that has been `pool.queued()`-ed.
 */
export interface QueuedTask<ThreadType extends Thread, Return> {
    /** @private */
    id: number;
    /** @private */
    run: TaskRunFunction<ThreadType, Return>;
    /**
     * Queued tasks can be cancelled until the pool starts running them on a worker thread.
     */
    cancel(): void;
    /**
     * `QueuedTask` is thenable, so you can `await` it.
     * Resolves when the task has successfully been executed. Rejects if the task fails.
     */
    then: Promise<Return>["then"];
}
CMCDragonkai commented 2 years ago

If Task is in fact a class Task extends Promise, it would have properties that would be enumerable, and properties that are not. We may need to specify this explicitly: https://debugmode.net/2020/06/18/how-to-make-a-property-non-enumerable-in-javascript/

Alternative is to form an a plain object like threadsjs does instead of using classes.

CMCDragonkai commented 2 years ago

There are some interesting timer APIs: https://nodejs.org/api/timers.html#timeoutrefresh

CMCDragonkai commented 2 years ago

Note that since TaskId is a IdSortable, it's strictly monotonic due to our storing of the last task ID...

But this assumes the last Task ID is always stored, and we are intending on deleting tasks off the schedule once completed. I wasn't thinking keeping historical tasks are useful (except for maybe debugging? Although it seems like it would be dropped in production, and logging/tracing systems should be maintaining the audit log).

This means the last task ID may be undefined. So we would store the last Task ID regardless of whether there are any tasks left in the scheduler.

Furthermore, when the clock is shifted backwards, the time will be incremented by 1 until it is greater than the last time. The 1 is the smallest unit of precision, in which case this would be 1 millisecond.

Afterwards, it will be strictly monotonic ID but have a weakly monotonic timestamp up to 4096 IDs per millisecond. After 4096 it would roll over.

The expectation is that it's not possible to generate more than 4096 IDs in a millisecond, so by that time, the time must have increased by at least 1 millisecond.

Anyway this means we need to store Scheduler/lastTaskId separate from the Scheduler/tasks level.

CMCDragonkai commented 2 years ago

Benchmark in js-workers shows that the overhead to call the workers takes about 1.16 to 1.5ms.

https://github.com/MatrixAI/js-workers/blob/27fb8c3051f0880b81a14ea7daee47b15a94dd89/benches/results/worker_manager_metrics.txt#L2

Worker Threads Intersection.xlsx

A CPU intensive task should be greater than that time to be worth sending to the worker.

However most scheduling work seems it might not actually be CPU intensive. Like NodeGraph and Discovery is mostly IO. I suppose discovery may have have CPU work to pattern match the data to find the right data on the pages it loads, but this should be dominated by the time spent on IO.

Furthermore sending it to a worker can introduce locking problems. The async locks do not work across the worker threads, they only work within the same event loop context. They are not thread-safe nor process-safe.

This should mean that we should not directly integrate WorkerManager into the Queue, instead individual domains may have their handlers directly pass work to the WorkerManager. The Queue does not decide this since it does not know the nature of the task. The domain that registers the handlers can decide the nature of the task. So they can execute within the main thread, or send it off to a web worker and await for it.

CMCDragonkai commented 2 years ago

This means naturally the Queue can have either 1 as a concurrency limit or 0 to indicate unbound concurrent limit. With an unbound concurrency limit, it just immediately proceeds to execute everything that is due for execution.

Priority only comes into play with a concurrency limit so that things get put into priority order. Otherwise all tasks will be asynchronous and immediately executed.

The worker's concurrency/parallel limit is not a concern of the Queue then.

CMCDragonkai commented 2 years ago

We decided not to bother with preventing resource starvation, however an idea is like this.

  1. Take advantage of DB's natural key ordering.
  2. Create a bimap index of Priority/Timestamp -> Task Id AND Timestamp/Priority -> Task Id
  3. Now we can iterate task ids based on 2 compound indexes: highest priority + earliest timestamp AND earliest timestamp + highest priority
  4. Use dynamic programming/kinetic priority function that iterates through both sublevels (indexes) simultaneously to fill up a fixed concurrency pool (if unlimited, this policy is unnecessary, just iterate through as fast as posssible)

Simultaneous iteration that uses the timestamp to weight the priority, where the timestamp delta starts from 0 and goes towards infinity. Once could say that this multiples the priority based on a "rate". A delta of 0 multiplies by 1. A delta of infinity multiplies by infinity. Therefore the rate produces a multiplier between 1 to infinity.

Here is an example of the 3 policies:

desmos-graph (2)

https://www.desmos.com/calculator/wnezpfgxqc

CMCDragonkai commented 2 years ago

Once we have the tasks system, all other domains should not have any kind of background processing implemented, they should delegate ALL of that functionality into the tasks system.

CMCDragonkai commented 2 years ago

The task management is ready. However integration into discovery and nodes domains is being done in #445.

Priority management is static, we won't bother with dynamic priority in https://github.com/MatrixAI/Polykey/issues/329#issuecomment-1223590432 before we see it be a problem.

Issue description here is still relevant to #445, since it contains notes on how best to refactor the discovery system.