PortBlueSky / thread-puddle

A library to pool Node.js worker threads, automatically exposing exported module methods using Proxy Objects. :rocket:
15 stars 3 forks source link

[Enhancement] Event Emitter Puddle #19

Closed Pandapip1 closed 2 years ago

Pandapip1 commented 2 years ago

It would be nice if thread-puddle had built-in support for event emitters. Specifically, it would be nice if worker.on, worker.once, and worker.all.once could attach the event emitter to the beginning of the arguments of the callback.

It would also be nice if worker.once would call the callback exactly once, but worker.all.once would call the callback once for each event emitter.

Example:

emitter.on("event", (x: number) => {
    // do whatever
});

gets converted to

worker.on("event", (e: EventEmitter, x: number) => {
    // whatever
});

Edit: ideally, this would work with all objects that implement on and/or once

kommander commented 2 years ago

Hey, that sounds like something useful. I thought about implementing callbacks, function references that can be called from the worker and the other way around. With that an event emitter could be implemented. So far I unfortunately had no production need to use them and my time is scarce these days. Do you have a suggestion and could open a PR?

Pandapip1 commented 2 years ago

Perhaps the functions could be serialized/deserialized: https://stackoverflow.com/questions/7395686/how-can-i-serialize-a-function-in-javascript/51123745#51123745

kommander commented 2 years ago

I actually need that for an upcoming feature in a current project, so I will implement it soon. Serialization unfortunately does not allow closures and I want to be able to do something like:

const cloj = 'hello'
worker.on('someevent', (value) => log(cloj, value))

Which will then work out of the box, by giving the worker thread a reference to the function on the main thread. The worker will get a proxy function that can be called, which will trigger the correct function on the main thread.

This will basically also work with any worker method that takes a callback then.

Pandapip1 commented 2 years ago

Serialization unfortunately does not allow closures

That's what the answer I linked attempted to solve.

kommander commented 2 years ago

Obviously you shouldn't expect closures to work, it is serialisation after all

This is what the answer says in the fine print, which I expect from serialization after all

Pandapip1 commented 2 years ago

This is the particular answer I was mentioning that claims to serialize arrow functions:

Function.deserialise = function(key, data) {
    return (data instanceof Array && data[0] == 'window.Function') ?
        new (Function.bind.apply(Function, [Function].concat(data[1], [data[2]]))) :
        data
    ;
};
Function.prototype.toJSON = function() {
    var whitespace = /\s/;
    var pair = /\(\)|\[\]|\{\}/;

    var args = new Array();
    var string = this.toString();

    var fat = (new RegExp(
        '^\s*(' +
        ((this.name) ? this.name + '|' : '') +
        'function' +
        ')[^)]*\\('
    )).test(string);

    var state = 'start';
    var depth = new Array(); 
    var tmp;

    for (var index = 0; index < string.length; ++index) {
        var ch = string[index];

        switch (state) {
        case 'start':
            if (whitespace.test(ch) || (fat && ch != '('))
                continue;

            if (ch == '(') {
                state = 'arg';
                tmp = index + 1;
            }
            else {
                state = 'singleArg';
                tmp = index;
            }
            break;

        case 'arg':
        case 'singleArg':
            var escaped = depth.length > 0 && depth[depth.length - 1] == '\\';
            if (escaped) {
                depth.pop();
                continue;
            }
            if (whitespace.test(ch))
                continue;

            switch (ch) {
            case '\\':
                depth.push(ch);
                break;

            case ']':
            case '}':
            case ')':
                if (depth.length > 0) {
                    if (pair.test(depth[depth.length - 1] + ch))
                        depth.pop();
                    continue;
                }
                if (state == 'singleArg')
                    throw '';
                args.push(string.substring(tmp, index).trim());
                state = (fat) ? 'body' : 'arrow';
                break;

            case ',':
                if (depth.length > 0)
                    continue;
                if (state == 'singleArg')
                    throw '';
                args.push(string.substring(tmp, index).trim());
                tmp = index + 1;
                break;

            case '>':
                if (depth.length > 0)
                    continue;
                if (string[index - 1] != '=')
                    continue;
                if (state == 'arg')
                    throw '';
                args.push(string.substring(tmp, index - 1).trim());
                state = 'body';
                break;

            case '{':
            case '[':
            case '(':
                if (
                    depth.length < 1 ||
                    !(depth[depth.length - 1] == '"' || depth[depth.length - 1] == '\'')
                )
                    depth.push(ch);
                break;

            case '"':
                if (depth.length < 1)
                    depth.push(ch);
                else if (depth[depth.length - 1] == '"')
                    depth.pop();
                break;
            case '\'':
                if (depth.length < 1)
                    depth.push(ch);
                else if (depth[depth.length - 1] == '\'')
                    depth.pop();
                break;
            }
            break;

        case 'arrow':
            if (whitespace.test(ch))
                continue;
            if (ch != '=')
                throw '';
            if (string[++index] != '>')
                throw '';
            state = 'body';
            break;

        case 'body':
            if (whitespace.test(ch))
                continue;
            string = string.substring(index);

            if (ch == '{')
                string = string.replace(/^{\s*(.*)\s*}\s*$/, '$1');
            else
                string = 'return ' + string.trim();

            index = string.length;
            break;

        default:
            throw '';
        }
    }

    return ['window.Function', args, string];
};

I'll test it out now.

UPDATE: I tested it on arrow functions. IT WORKED!!!

image

kommander commented 2 years ago

Does it work with closures though? Can it serialise and deserialise this one:

const closure = 'world'
const arrow = () => console.log(`Hello ${closure}`)
Pandapip1 commented 2 years ago

Works for me, as long as closure is defined in both environments: image

kommander commented 2 years ago

Very interesting 🤔 This means the closure has to be transferred as well though. Not sure how that would be managed. And if the closure is a large dataset on main, like when map/reducing through workers and doing the reduce step on the main thread... I guess that could be worked around by working around the closure, but it seems odd.

I will be done with the callbacks somewhere the next days, then you can try if it suits your usecase.

kommander commented 2 years ago

A lot of other stuff going on here like types, cleanup, package upgrades, pipeline setup etc. to get this up to date, but the basic callback/functions feature is in there: #21.

kommander commented 2 years ago

@Pandapip1 I merged the PR. There is an EventEmitter example here. One caveat: Because the callbacks are executed on the main thread, or whatever thread they are created on in a nested worker case, the return value of a callback is not transferred back to the worker thread.

I will create a pre-release shortly.

kommander commented 2 years ago

@Pandapip1 The current implementation works for my usecase and seems to behave well. I'll add transferables to those callbacks as well and handling callback return values, passing them back to the thread. Then this is done for me. Does it so far cover your usecase?

Pandapip1 commented 2 years ago

Not sure, it's been a while. Do you have a link to the docs?

kommander commented 2 years ago

Huh? It's all in the README https://github.com/PortBlueSky/thread-puddle#async-workermethodarguments and there is an example for using EventEmitters here https://github.com/PortBlueSky/thread-puddle/tree/master/examples/ts-eventemitter - which I thought was your original use case of this issue?

Pandapip1 commented 2 years ago

Oh I see. I'll review it. Thank you again!

Pandapip1 commented 2 years ago

Yes, this satisfies my use case. TYSM!!!