andywer / threads.js

🧵 Make web workers & worker threads as simple as a function call.
https://threads.js.org/
MIT License
3.04k stars 161 forks source link

Tasks stuck and pool blocked #254

Open anna-rmrf opened 4 years ago

anna-rmrf commented 4 years ago

Hello, I'm having this weird behaviour where tasks are queued but the function itself is not running, and runningTasks just end up pilling up... Keep in mind that I have not changed anything in my code..

console.log('1')
        this[pool].queue(async worker => {
console.log('2')
            let m = worker[method]
console.log(m)
            if (!m) return "Method doesn't exist";
console.log('3')
            q = await m(item)
console.log('4')
            return q;
        })
console.log('5')
        await this[pool].completed()
console.log('6')
        return q;

and from the worker file:

const workerFunctions= {
    async gen(c) {
console.log('FUNCTION RUNNING')
..................
}
...........
}
expose(workerFunctions)

Behaviour: 1 2 [Function] 3 5

The thing is, the exposed function is not being executed at all (FUNCTION RUNNING is never logged) and the pool just ends up having more and more runningTasks pilling up....

andywer commented 4 years ago

Hey @aya1337. Have you tried to enable debug logging?

https://threads.js.org/usage-advanced#debug-logging

andywer commented 4 years ago

Sorry, closed wrong issue 🙊

anna-rmrf commented 4 years ago

Hey @aya1337. Have you tried to enable debug logging?

https://threads.js.org/usage-advanced#debug-logging

I didn't try debug logging, no idea how that went past my head Had to seperate the functions exposed into 3 types of pools and its now working just fine. I couldn't wait because the service was down for so many people haha

I might try to check out the reason out when I have time, I'll let you know if its anything serious Thank you!

andywer commented 4 years ago

Let’s close this issue for now then. Would be great if you can share any news on this you might come up with 👍

anna-rmrf commented 4 years ago

Hello, it has happened again... I used debugging and this is what is happening (Keep in mind that the same code above is used and the same scenario as above is happening)

Debugging just the pool:

  threads:pool:PoolOne Queueing task #1... +0ms
  threads:pool:PoolOne Attempt de-queueing a task in order to run it... +1ms
  threads:pool:PoolOne Running task #1 on worker #1... +1ms

and its stuck after.

If i terminate the entire pool forcefully and make a new one and then queue a task: Debugging everything:

  threads:master:thread-utils Terminating worker +0ms
  threads:master:spawn Initializing new thread +17m
  threads:master:messages Message from worker before finishing initialization: { type: 'init', exposed: { type: 'module', methods: [ 'testfunc' ] } } +17m
  threads:pool:PoolOne Queueing task #1... +0ms
  threads:pool:PoolOne Attempt de-queueing a task in order to run it... +1ms
  threads:pool:PoolOne Running task #1 on worker #1... +1ms
  threads:master:messages Sending command to run function to worker: { type: 'run', uid: 3, method: 'testfunc', args: [ [ [Object] ] ] } +19m

This is the pool:

WorkerPool {
  eventSubject: MulticastSubject {
    _subscriber: [Function],
    _observers: Set { [SubscriptionObserver] }
  },
  initErrors: [],
  isClosing: false,
  nextTaskID: 2,
  taskQueue: [],
  debug: [Function: debug],
  options: { name: 'PoolOne', size: 1 },
  workers: [ { init: [Promise], runningTasks: [Array] } ], //runningTasks.length = 1
  eventObservable: Observable { _subscriber: [Function] }
}
andywer commented 4 years ago

Hey @aya1337. Sorry, I just don't have a lot of time right now and I don't have a hot lead yet why that happens.

Maybe someone else can help 🙋‍♂️

andywer commented 4 years ago

Hey @aya1337. Last weekend I just released a pool task completion bug fix (see #271). Maybe your issue was related to that one? Maybe you can try again with the latest version :)

andywer commented 4 years ago

Any updates, @aya1337?

anna-rmrf commented 4 years ago

@andywer I was actually typing it haha. Thanks for the update. It unfortunately did not solve the issue. Tasks keep pilling up endlessly, the workers all have runningTasks that never execute. No exposed function ever gets called even (First thing it does is console log pid).

I tried to find a code to 100% reproduce it but I had some weird results. If I await anything before queueing (even if Im not passing the result to the queue), the behaviour occurs. If theres no awaiting before queueing, it works as expected (unless i wrap queueing into a function?). Perhaps theres a certain concept I'm unaware of?


poolOne.terminate(true)

const { spawn, Pool, Worker } = require("threads");
poolOne = Pool(() => spawn(new Worker("./poolOne"), { timeout: 120000 }),{name: "poolOne",size: 1});

const asyncStuff = [...(await asyncFunction())] //Unused
const suppliedArray = [] //Or even: const suppliedArray = asyncStuff

const res = [];

poolOne.queue(async worker => {
    try {
    console.log('1')
    const result = await worker.exposedFunction(suppliedArray)
    console.log('2')
    return res.push(result);
    } catch(e) {
    return false;
    }
});

await poolOne.completed();
return res;

I tried to wrap the whole queuing and waiting for completion into an async function. It surprisingly worked when testing (not too sure about how it would be in production)

poolOne.terminate(true)
const { spawn, Pool, Worker } = require("threads");
poolOne = Pool(() => spawn(new Worker("./poolOne"), { timeout: 120000 }),{name: "poolOne",size: 1});
const asyncStuff = [...(await asyncFunction())]
const wrapped = async (suppliedArray) => {
const res = [];
poolOne.queue(async worker => {
    try {
    console.log('1')
    const result = await worker.exposedFunction(suppliedArray)
    console.log('2')
    return res.push(result);
    } catch(e) {
    return false;
    }
});
await poolOne.completed();
return res;
}

return wrapped(asyncStuff) //or even: return wrapped([])

Super weird.

andywer commented 4 years ago

Damn race conditions. But I might just have had a valuable train of thought:

Maybe the message to run the function in the worker is sent before the worker has set up its message handler. Just had a quick look at the code and we don't make sure that the spawn() calls have actually finished before dispatching tasks.

Without a pool that's not an issue as you need the async result of spawn() to do anything with the worker, so you have to wait. The pool constructor, however, needs to be synchronous and thus it initializes the workers in the background.

We need to make sure that's finished before we start dispatching tasks. I will prepare a PR later!

andywer commented 4 years ago

On a second thought… We do wait for every worker to finish initialization before dispatching tasks, though:

https://github.com/andywer/threads.js/blob/9fa13af4a99d11a844899f022590139d6fb101ac/src/master/pool.ts#L173

Will need to keep thinking…

Edit: It is unlikely, but maybe it is the way that the workers initialize. We send the init message first and then we actually subscribe to incoming messages. I mean, those two steps happen only microseconds apart from each other in the same function – I would assume message passing between threads takes longer than that, but maybe I am wrong and this is the problem.

Let me prepare a PR later 😉

anna-rmrf commented 4 years ago

I believe you are right. I just got confused on how the same behaviour happened even when I didnt use the variable where I awaited const asyncStuff = [...(await asyncFunction())]. At one point, I thought it was unrelated to this package (because why would a variable thats never used when queueing would cause such thing).

As for spawning, I actually await like 2 seconds ms => new Promise(res => setTimeout(res, ms)); after initializing the pool (i just didnt include it in the code) but you might be correct. I'll wait for the PR and will let you know.

Thanks for giving me a bit of your time to answer. Really appreciate it

andywer commented 4 years ago

Wanted to write the potential fix when I realized that we do set up the critical message handlers in the worker before we send the init message. So that can't be it then.

I have some more questions about the test scenario you posted in the beginning, though:

  1. Are you using the latest version of threads.js?
  2. Did you run it using the tiny-worker fallback?
  3. How many workers did the pool spawn – the default (= number of CPU threads)? That would help debugging the debug log.
anna-rmrf commented 4 years ago

@andywer

  1. Latest version https://github.com/andywer/threads.js/commit/9fa13af4a99d11a844899f022590139d6fb101ac
  2. Yes, I'm using tiny-worker fallback (sorry for failing to mention that)
  3. I tried the default (4), tried 8, even tried just 1 (for the debugging above, i used only 1 -size: 1-)
ashishchandr70 commented 3 years ago

@andywer is there a fix for this? I am having the same (or perhaps similar) issue like @anna-rmrf.

Here is my code (from a mocha test):

it('RUN - Can create multiple concurrent players', async () => {
        console.log(`\x1b[34m Creating player pool \x1b[0m`);
        playersPool = Pool(() => spawn(new Worker(`../scripts/player-setup`)), MAXPLAYERS);
        console.log(`\x1b[34m Player pool created \x1b[0m`);

        //const playerThread = await spawn(new Worker(`../scripts/player-setup`));

        let battleMessage = await new Promise(async (resolve, reject) => {
            const ws = new WSClient(WebSocket, socketUrl, ['GiveUp']);

            ws.on('msg', async (message) => {
                // const message = JSON.parse(e.data);
                console.log(`WS Message:`, message);
                if (message.subject === 'GiveUp') {
                    ws.close();
                    await playersPool.completed();
                    resolve(message);
                }
            });            
            console.log(`\x1b[34m Submitting worker method \x1b[0m`);
            playersPool.queue(async worker => {
                await worker.executeRandomTest({
                    playerName: 'TommyBoy',
                    environment: environment
                });    
                console.log(`\x1b[34m executeRandomTest invoked \x1b[0m`);       
            });

            console.log(`\x1b[34m Worker method submitted \x1b[0m`);
            /* await playerThread.executeRandomTest({
                playerName: 'TommyBoy',
                environment: environment
            }); */

        });
        console.log(`${battleMessage}`);
        await playersPool.terminate();

        expect(1).to.equal(1);
    });

The stuff I have commented out (spawning a single thread and then running executeRandomTest is what works when a pool is not used. So I know that executeRandomTest works, just not with the thread pool.

Here is the output (DEBUG was on). It got stuck after printing the final message:

...
Player setup completed successfully
    ✓ RUN - Can be deployed (Mock Only)
  threads:master:spawn Initializing new thread +0ms
  threads:master:messages Message from worker before finishing initialization: {
  type: 'init',
  exposed: {
    type: 'module',
    methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
  }
} +0ms
  threads:master:spawn Initializing new thread +2s
  threads:master:messages Message from worker before finishing initialization: {
  type: 'init',
  exposed: {
    type: 'module',
    methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
  }
} +2s
    ✓ RUN - Can set up players for battle (3655ms)
 Creating player pool 
  threads:master:spawn Initializing new thread +2s
  threads:master:spawn Initializing new thread +4ms
  threads:master:spawn Initializing new thread +4ms
  threads:master:spawn Initializing new thread +7ms
  threads:master:spawn Initializing new thread +5ms
  threads:master:spawn Initializing new thread +12ms
  threads:master:spawn Initializing new thread +9ms
  threads:master:spawn Initializing new thread +9ms
  threads:master:spawn Initializing new thread +4ms
  threads:master:spawn Initializing new thread +14ms
 Player pool created 
 Submitting worker method 
  threads:pool:1 Queueing task #1... +0ms
  threads:pool:1 Attempt de-queueing a task in order to run it... +1ms
 Worker method submitted 
  threads:pool:1 Running task #1 on worker #1... +4ms
  threads:pool:1 Task #1 failed +10s
  threads:pool:1 Error while initializing pool worker: Error: Timeout: Did not receive an init message from worker after 10000ms. Make sure the worker calls expose().
    at Timeout._onTimeout (/usr/local/src/backend/node_modules/threads/dist/master/spawn.js:35:53)
    at listOnTimeout (internal/timers.js:554:17)
    at processTimers (internal/timers.js:497:7) +1ms
  threads:pool:1 Task #1 errored: Error: Timeout: Did not receive an init message from worker after 10000ms. Make sure the worker calls expose().
    at Timeout._onTimeout (/usr/local/src/backend/node_modules/threads/dist/master/spawn.js:35:53)
    at listOnTimeout (internal/timers.js:554:17)
    at processTimers (internal/timers.js:497:7) +14ms
  threads:pool:1 Attempt de-queueing a task in order to run it... +25ms
  threads:pool:1 Task queue is empty +0ms
  threads:master:messages Message from worker before finishing initialization: {
  type: 'init',
  exposed: {
    type: 'module',
    methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
  }
} +12s
  threads:master:messages Message from worker before finishing initialization: {
  type: 'init',
  exposed: {
    type: 'module',
    methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
  }
} +24ms
  threads:master:messages Message from worker before finishing initialization: {
  type: 'init',
  exposed: {
    type: 'module',
    methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
  }
} +62ms
  threads:master:messages Message from worker before finishing initialization: {
  type: 'init',
  exposed: {
    type: 'module',
    methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
  }
} +81ms
  threads:master:messages Message from worker before finishing initialization: {
  type: 'init',
  exposed: {
    type: 'module',
    methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
  }
} +83ms
  threads:master:messages Message from worker before finishing initialization: {
  type: 'init',
  exposed: {
    type: 'module',
    methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
  }
} +51ms
  threads:master:messages Message from worker before finishing initialization: {
  type: 'init',
  exposed: {
    type: 'module',
    methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
  }
} +54ms
  threads:master:messages Message from worker before finishing initialization: {
  type: 'init',
  exposed: {
    type: 'module',
    methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
  }
} +33ms
  threads:master:messages Message from worker before finishing initialization: {
  type: 'init',
  exposed: {
    type: 'module',
    methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
  }
} +7ms
  threads:master:messages Message from worker before finishing initialization: {
  type: 'init',
  exposed: {
    type: 'module',
    methods: [ 'executePvPBattle', 'executeBotBattle', 'executeRandomTest' ]
  }
} +44ms

It appears to be timing out before the init on the pool of worker threads is called.

ashishchandr70 commented 3 years ago

Actually, in looking through the error and where it is coming from (spawn.js), found a nice workaround:

Set THREADS_WORKER_INIT_TIMEOUT to something higher e.g. 20 seconds as the default of 10 seconds is what is tripping up the initialization process.

So, this worked:

THREADS_WORKER_INIT_TIMEOUT=20000 npm test -- -g "RUN"

andywer commented 3 years ago

@ashishchandr70 Great that you found a workaround! Yeah, we should document that somewhere…

Say, do by any chance run tests concurrently? This timeout tend to happen more often when trying to spawn a lot of workers at once.

ashishchandr70 commented 3 years ago

Hi @andywer I don't run them concurrently. I was just creating the worker pool and then the next command was to use one of the workers to execute an exposed method.