nestjs / bull

Bull module for Nest framework (node.js) :cow:
https://nestjs.com
MIT License
614 stars 102 forks source link

Processor cleaning on `app.close` #2332

Closed courtem closed 17 hours ago

courtem commented 1 week ago

Is there an existing issue for this?

Current behavior

A processor within request scope is always awake, even if the application has been closed and enableShutdownHooks is in use. This behavior is mainly observed when running e2e tests without threads.

Could be an event emitter not cleared?

This also could explain why the soft exist doesn't work for my worker in my Kubernetes cluster.

Minimum reproduction code

https://github.com/courtem/request-scope-processor-cleaning

Steps to reproduce

  1. docker run -d --name redis-container -p 6379:6379 redis
  2. npm i
  3. npm run test:e2e

Expected behavior

The worker instantiated by the test must process the test tasks.

Package version

10.2.2

Bull version

5.28.0

NestJS version

9.0.0

Node.js version

16.13.2

In which operating systems have you tested?

Other

No response

courtem commented 1 week ago

On the suggestions of a discussion, I used the following temporary solution to correct my tests (and just for the test):

@Injectable()
export class QueueClientFactory implements OnApplicationShutdown {
  clients: IORedis[];

  constructor(private configService: ConfigService) {
    this.clients = [];
  }

  build(options?: RedisOptions): IORedis {
    const client = new IORedis({
      ...options,
      host: this.configService.get('queue.host'),
      port: this.configService.get('queue.port'),
    });

    this.clients.push(client);

    return client;
  }

  async onApplicationShutdown() {
    await Promise.all(
      this.clients
        .filter((client) => client.status === 'connecting')
        .map((client) => {
          return new Promise((resolve) => {
            client.on('ready', resolve);
          });
        }),
    );

    await Promise.all(this.clients.filter((client) => client.status !== 'end').map((client) => client.quit()));
  }
}

...
      BullModule.forRootAsync({
        imports: [QueueClientModule],
        useFactory: (queueClientFactory: QueueClientFactory) => ({
          connection: queueClientFactory.build({ maxRetriesPerRequest: null }),
        }),
        inject: [QueueClientFactory],
      }),
...

Here are the error logs:

Error: Connection is closed.
    at EventEmitter.sendCommand (node_modules/ioredis/built/Redis.js:332:28)
    at Script.execute (node_modules/ioredis/built/Script.js:59:26)
    at EventEmitter.moveToActive:5.28.0 (node_modules/ioredis/built/utils/Commander.js:111:27)
    at Scripts.execCommand (node_modules/bullmq/src/classes/scripts.ts:74:49)
    at Scripts.moveToActive (node_modules/bullmq/src/classes/scripts.ts:1237:31)
    at Worker.moveToActive (node_modules/bullmq/src/classes/worker.ts:670:26)
    at Worker._getNextJob (node_modules/bullmq/src/classes/worker.ts:600:23)
    at processTicksAndRejections (node:internal/process/task_queues:95:5)
    at Worker.retryIfFailed (node_modules/bullmq/src/classes/worker.ts:1175:16)
kamilmysliwiec commented 17 hours ago

Fix https://github.com/nestjs/bull/commit/0f6171ee5b96158d89f4ffe1badd29679407110a

Published as 10.2.3