proto-kit / framework

Apache License 2.0
28 stars 10 forks source link

Task Framework #24

Closed rpanic closed 1 year ago

rpanic commented 1 year ago

We need some way to create a framework the completion of tasks. Tasks include basically all expensive, async computations. In our case mostly (but not limited to) proving work.

I already implemented a pretty generic worker framework for Njord and highly suggest to re-use it.

Specification

The worker framework uses a central RabbitMQ messagequeue with the amqp protocol. It allows for nice customiseability while having a really simple and efficient design.

We will have a master (coordinator), that pushes the tasks to the message queue, and workers then consume the tasks from the message queue. The results are then again pushed back to the coordinator through a different queue.

This example shows the API and implements a so-called ReducableTask that can be used for things like tree-based merging. Other kinds of tasks like normal once-executed tasks have also been already implemented in this model.


    // The implementation of the task, known by both master and worker
    let Task = class Task implements ReducableTask<number, number>{

        name(): string {
            // Tells the framework from which queues to consume
            return "sum";
        }

        // Worker-executed
        prepare(): Promise<void> {
            // we can call .compile() here for example
            return Promise.resolve(undefined);
        }

        // Master-executed
        reducible(r1: number, r2: number): boolean {
            // Checks if the tasks r1 and r2 fit together and can be reduced
            return true;
        }

        serializer(): Serializer<number> {
            return {
                fromJSON(s: string): number {
                    return parseInt(s)
                },
                toJSON(t: number): string {
                    return t + ""
                }
            };
        }

        // Worker-executed
        reduce(r1: number, r2: number): Promise<number> {
            // Does the actual reducing work
            return Promise.resolve(r1 + r2)
        }

    }
    let task = new Task()

    const inputs = [1,2,3,4,6,47,2,745,83,8,589,34,7,62,346,247,458748,47,48,37];

    let coord = new WorkerCoordinator()

    //Executes the task on the workers and reports back once the task has been fully reduced
    let res = await coord.executeReducingTask(task, inputs)
    // -> res == sum(inputs)

The use of a message queue allows us to very easily scale up workers based on demand. As soon as they are spun up, they are connecting to the queue and start consuming and processing tasks. Additionally, graceful shutdowns will be implemented very easily