taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
5.92k stars 384 forks source link

[Bug]: Connection is closed error #2686

Open bobgubko opened 1 month ago

bobgubko commented 1 month ago

Version

v5.12

Platform

NodeJS

What happened?

I'm getting random Connection is closed. errors from my repeating jobs. Simplified example below. If I comment out that 2ms delay OR increase the delay to around 6-8ms, error goes away. Probably it has something to do with that 5ms delay in queue-base.ts?

As a fix I moved queue creation (const baz = new Queue('baz'...) in foo.ts out of default function scope, so it does not get created/closed every time worker runs. Is it a good practice to create queue on each run or better to have 1 instance?

How to reproduce.

producer.ts

import { Queue, QueueOptions, RepeatOptions } from 'bullmq';

const repeatOptions: RepeatOptions = {
  every: 3_000,
  immediately: false
}

export const baseQueueOptions: QueueOptions = {
  connection: {
    host: '127.0.0.1',
    port: 6379,
  },
}

; (async () => {
  const queue1 = new Queue('foo');

  await queue1.obliterate()

  for (let i = 1; i <= 5; i++) {
    await queue1.add(
      `project-${i}`,
      { projectId: 1 },
      {
        repeat: repeatOptions,
        jobId: `project-${i}-Foo`,
        ...baseQueueOptions
      },
    )
  }

  process.exit()

})();

workers.ts

import { QueueOptions, Worker } from "bullmq"
import path from 'path'

export const baseQueueOptions: QueueOptions = {
  connection: {
    host: '127.0.0.1',
    port: 6379,
  },
}

const foo = new Worker(
  'foo',
  path.join(__dirname, 'foo.js'),
  baseQueueOptions
);

[foo].forEach((worker) => {
  worker.on('failed', (job, err) => {
    console.error(`Worker ${worker.name} error: ${err}`, err)
  })
})

foo.on('active', (job) => {
  console.log(`Worker foo active: ${job.id}`)
})

foo.ts

import { Queue, SandboxedJob } from "bullmq";

export default async (job: SandboxedJob) => {
  const baz = new Queue('baz', {
    connection: {
      host: '127.0.0.1',
      port: 6379,
    },
  });

  await new Promise((resolve) => setTimeout(resolve, 2))

  await baz.close()
}

node dist/producer.js; node dist/workers.js

Relevant log output

DEBUG=ioredis:* node dist/workers.js:

  ioredis:redis status[127.0.0.1:6379]: wait -> connecting +0ms
  ioredis:redis status[127.0.0.1:6379 (bull:Zm9v)]: wait -> connecting +2ms
  ioredis:redis status[127.0.0.1:6379]: connecting -> connect +2ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> info([]) +1ms
  ioredis:redis status[127.0.0.1:6379 (bull:Zm9v)]: connecting -> connect +1ms
  ioredis:redis write command[127.0.0.1:6379 (bull:Zm9v)]: 0 -> info([]) +0ms
  ioredis:redis status[127.0.0.1:6379]: connect -> ready +1ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> info([]) +1ms
  ioredis:redis status[127.0.0.1:6379 (bull:Zm9v)]: connect -> ready +1ms
  ioredis:connection set the connection name [bull:Zm9v] +0ms
  ioredis:redis write command[127.0.0.1:6379 (bull:Zm9v)]: 0 -> client([ 'setname', 'bull:Zm9v' ]) +0ms
  ioredis:redis write command[127.0.0.1:6379 (bull:Zm9v)]: 0 -> info([]) +1ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('7826d4e8fbae2a5aa9e9a9ba4646ece8d90e98a5,9,bull:foo:stalled,bull:foo:wait,bull:foo:active,bull:foo:failed,bull:foo:stalled-check,bull:foo:meta,bull:foo:paused,bull:foo:marker,bull:foo:events,1,bull:fo ... <REDACTED full-length="222">') +1ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('accb36ea979fc4cd20f4a56e3c4ad6df0ec0b14a,11,bull:foo:wait,bull:foo:active,bull:foo:prioritized,bull:foo:events,bull:foo:stalled,bull:foo:limiter,bull:foo:delayed,bull:foo:paused,bull:foo:meta,bull:foo ... <REDACTED full-length="324">') +1ms
  ioredis:redis write command[127.0.0.1:6379 (bull:Zm9v)]: 0 -> bzpopmin([ 'bull:foo:marker', '0.801' ]) +1ms
  ioredis:redis write command[127.0.0.1:6379 (bull:Zm9v)]: 0 -> bzpopmin([ 'bull:foo:marker', '0.801' ]) +0ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('accb36ea979fc4cd20f4a56e3c4ad6df0ec0b14a,11,bull:foo:wait,bull:foo:active,bull:foo:prioritized,bull:foo:events,bull:foo:stalled,bull:foo:limiter,bull:foo:delayed,bull:foo:paused,bull:foo:meta,bull:foo ... <REDACTED full-length="324">') +888ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> info([]) +3ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha([ '5f2501125f452051b1e175d34985b3dc8a76d92a', '1', 'bull:foo:repeat', '1722699453000', <Buffer de 00 05 a4 6e 61 6d 65 a9 70 72 6f 6a 65 63 74 2d 31 a7 65 6e 64 44 61 74 65 c0 a2 74 7a c0 a7 70 61 74 74 65 72 6e c0 a5 65 76 65 72 79 cd 0b b8>, 'project-1:project-1-Foo:::3000', '1613cea15203408dc917a40a9c008a09', '0' ]) +2ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('6e5ce8d7bbfc5aaa56e29abc9babda5cd2a0149d,6,bull:foo:marker,bull:foo:meta,bull:foo:id,bull:foo:delayed,bull:foo:completed,bull:foo:events,��bull:foo:�5repeat:1613cea15203408dc917a40a9c008a09:1722699453 ... <REDACTED full-length="508">') +3ms
Worker foo active: repeat:1613cea15203408dc917a40a9c008a09:1722699450000
2024-08-03T15:37:30.224Z ioredis:redis status[127.0.0.1:6379]: wait -> connecting
2024-08-03T15:37:30.228Z ioredis:redis status[127.0.0.1:6379]: connecting -> connect
2024-08-03T15:37:30.229Z ioredis:redis write command[127.0.0.1:6379]: 0 -> info([])
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('696e8b473d10b338128a231f257ef90153a5111d,14,bull:foo:wait,bull:foo:active,bull:foo:prioritized,bull:foo:events,bull:foo:stalled,bull:foo:limiter,bull:foo:delayed,bull:foo:paused,bull:foo:meta,bull:foo ... <REDACTED full-length="571">') +135ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha([ '5f2501125f452051b1e175d34985b3dc8a76d92a', '1', 'bull:foo:repeat', '1722699453000', <Buffer de 00 05 a4 6e 61 6d 65 a9 70 72 6f 6a 65 63 74 2d 32 a7 65 6e 64 44 61 74 65 c0 a2 74 7a c0 a7 70 61 74 74 65 72 6e c0 a5 65 76 65 72 79 cd 0b b8>, 'project-2:project-2-Foo:::3000', '76f434a83d7b57a1df0c9f4714cd9148', '0' ]) +11ms
2024-08-03T15:37:30.241Z ioredis:redis status[127.0.0.1:6379]: end -> ready
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('6e5ce8d7bbfc5aaa56e29abc9babda5cd2a0149d,6,bull:foo:marker,bull:foo:meta,bull:foo:id,bull:foo:delayed,bull:foo:completed,bull:foo:events,��bull:foo:�5repeat:76f434a83d7b57a1df0c9f4714cd9148:1722699453 ... <REDACTED full-length="507">') +2ms
2024-08-03T15:37:30.243Z ioredis:redis queue command[127.0.0.1:6379]: 0 -> info([])
2024-08-03T15:37:30.243Z ioredis:redis status[127.0.0.1:6379]: ready -> close
2024-08-03T15:37:30.243Z ioredis:connection skip reconnecting since the connection is manually closed.
2024-08-03T15:37:30.243Z ioredis:redis status[127.0.0.1:6379]: close -> end
Worker foo active: repeat:76f434a83d7b57a1df0c9f4714cd9148:1722699450000
  ioredis:redis write command[127.0.0.1:6379]: 0 -> multi([]) +6ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('ab2818154a3f405adad5e01b769e79c50fb9e615,1,bull:foo:repeat:76f434a83d7b57a1df0c9f4714cd9148:1722699450000,["Error: Unexpected exit code: 0 signal: null\\n    at Child.exitHandler (/Users/bob/ ... <REDACTED full-length="639">') +0ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('696e8b473d10b338128a231f257ef90153a5111d,14,bull:foo:wait,bull:foo:active,bull:foo:prioritized,bull:foo:events,bull:foo:stalled,bull:foo:limiter,bull:foo:delayed,bull:foo:paused,bull:foo:meta,bull:foo ... <REDACTED full-length="595">') +0ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> exec([]) +0ms
Worker foo error: Error: Unexpected exit code: 0 signal: null Error: Unexpected exit code: 0 signal: null
    at Child.exitHandler (/Users/bob/bullmq-connection-closed-repro/node_modules/bullmq/dist/cjs/classes/sandbox.js:43:24)
    at Child.emit (node:events:529:35)
    at ChildProcess.<anonymous> (/Users/bob/bullmq-connection-closed-repro/node_modules/bullmq/dist/cjs/classes/child.js:91:18)
    at ChildProcess.emit (node:events:517:28)
    at ChildProcess._handle.onexit (node:internal/child_process:292:12)
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha([ '5f2501125f452051b1e175d34985b3dc8a76d92a', '1', 'bull:foo:repeat', '1722699453000', <Buffer de 00 05 a4 6e 61 6d 65 a9 70 72 6f 6a 65 63 74 2d 33 a7 65 6e 64 44 61 74 65 c0 a2 74 7a c0 a7 70 61 74 74 65 72 6e c0 a5 65 76 65 72 79 cd 0b b8>, 'project-3:project-3-Foo:::3000', '20ed202732d0348d8fe68530181b96e2', '0' ]) +2ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('6e5ce8d7bbfc5aaa56e29abc9babda5cd2a0149d,6,bull:foo:marker,bull:foo:meta,bull:foo:id,bull:foo:delayed,bull:foo:completed,bull:foo:events,��bull:foo:�5repeat:20ed202732d0348d8fe68530181b96e2:1722699453 ... <REDACTED full-length="508">') +0ms
Worker foo active: repeat:20ed202732d0348d8fe68530181b96e2:1722699450000
2024-08-03T15:37:30.352Z ioredis:redis status[127.0.0.1:6379]: wait -> connecting
2024-08-03T15:37:30.355Z ioredis:redis status[127.0.0.1:6379]: connecting -> connect
2024-08-03T15:37:30.356Z ioredis:redis write command[127.0.0.1:6379]: 0 -> info([])
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('696e8b473d10b338128a231f257ef90153a5111d,14,bull:foo:wait,bull:foo:active,bull:foo:prioritized,bull:foo:events,bull:foo:stalled,bull:foo:limiter,bull:foo:delayed,bull:foo:paused,bull:foo:meta,bull:foo ... <REDACTED full-length="571">') +106ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha([ '5f2501125f452051b1e175d34985b3dc8a76d92a', '1', 'bull:foo:repeat', '1722699453000', <Buffer de 00 05 a4 6e 61 6d 65 a9 70 72 6f 6a 65 63 74 2d 34 a7 65 6e 64 44 61 74 65 c0 a2 74 7a c0 a7 70 61 74 74 65 72 6e c0 a5 65 76 65 72 79 cd 0b b8>, 'project-4:project-4-Foo:::3000', '82b4c00f52f8bb6abd3c79b8dff047f7', '0' ]) +1ms
2024-08-03T15:37:30.358Z ioredis:redis status[127.0.0.1:6379]: end -> ready
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('6e5ce8d7bbfc5aaa56e29abc9babda5cd2a0149d,6,bull:foo:marker,bull:foo:meta,bull:foo:id,bull:foo:delayed,bull:foo:completed,bull:foo:events,��bull:foo:�5repeat:82b4c00f52f8bb6abd3c79b8dff047f7:1722699453 ... <REDACTED full-length="508">') +0ms
Worker foo active: repeat:82b4c00f52f8bb6abd3c79b8dff047f7:1722699450000
2024-08-03T15:37:30.359Z ioredis:redis queue command[127.0.0.1:6379]: 0 -> info([])
2024-08-03T15:37:30.359Z ioredis:redis status[127.0.0.1:6379]: wait -> connecting
2024-08-03T15:37:30.360Z ioredis:redis status[127.0.0.1:6379]: ready -> close
2024-08-03T15:37:30.360Z ioredis:connection skip reconnecting since the connection is manually closed.
2024-08-03T15:37:30.360Z ioredis:redis status[127.0.0.1:6379]: close -> end
  ioredis:redis write command[127.0.0.1:6379]: 0 -> multi([]) +3ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('ab2818154a3f405adad5e01b769e79c50fb9e615,1,bull:foo:repeat:82b4c00f52f8bb6abd3c79b8dff047f7:1722699450000,["Error: Connection is closed.\\n    at close (/Users/bob/bullmq-connection-c ... <REDACTED full-length="570">') +0ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('696e8b473d10b338128a231f257ef90153a5111d,14,bull:foo:wait,bull:foo:active,bull:foo:prioritized,bull:foo:events,bull:foo:stalled,bull:foo:limiter,bull:foo:delayed,bull:foo:paused,bull:foo:meta,bull:foo ... <REDACTED full-length="580">') +0ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> exec([]) +0ms
Worker foo error: Error: Connection is closed. Error: Connection is closed.
    at close (/Users/bob/bullmq-connection-closed-repro/node_modules/ioredis/built/redis/event_handler.js:189:25)
    at Socket.<anonymous> (/Users/bob/bullmq-connection-closed-repro/node_modules/ioredis/built/redis/event_handler.js:156:20)
    at Object.onceWrapper (node:events:632:26)
    at Socket.emit (node:events:529:35)
    at TCP.<anonymous> (node:net:350:12)
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha([ '5f2501125f452051b1e175d34985b3dc8a76d92a', '1', 'bull:foo:repeat', '1722699453000', <Buffer de 00 05 a4 6e 61 6d 65 a9 70 72 6f 6a 65 63 74 2d 35 a7 65 6e 64 44 61 74 65 c0 a2 74 7a c0 a7 70 61 74 74 65 72 6e c0 a5 65 76 65 72 79 cd 0b b8>, 'project-5:project-5-Foo:::3000', 'e9e86f8cbcee14b04d17f9d2812f8f80', '0' ]) +1ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('6e5ce8d7bbfc5aaa56e29abc9babda5cd2a0149d,6,bull:foo:marker,bull:foo:meta,bull:foo:id,bull:foo:delayed,bull:foo:completed,bull:foo:events,��bull:foo:�5repeat:e9e86f8cbcee14b04d17f9d2812f8f80:1722699453 ... <REDACTED full-length="508">') +1ms
Worker foo active: repeat:e9e86f8cbcee14b04d17f9d2812f8f80:1722699450000
2024-08-03T15:37:30.449Z ioredis:redis status[127.0.0.1:6379]: wait -> connecting
2024-08-03T15:37:30.451Z ioredis:redis status[127.0.0.1:6379]: connecting -> connect
2024-08-03T15:37:30.452Z ioredis:redis write command[127.0.0.1:6379]: 0 -> info([])
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('696e8b473d10b338128a231f257ef90153a5111d,14,bull:foo:wait,bull:foo:active,bull:foo:prioritized,bull:foo:events,bull:foo:stalled,bull:foo:limiter,bull:foo:delayed,bull:foo:paused,bull:foo:meta,bull:foo ... <REDACTED full-length="571">') +89ms
  ioredis:redis write command[127.0.0.1:6379 (bull:Zm9v)]: 0 -> bzpopmin([ 'bull:foo:marker', '2.547' ]) +1ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('accb36ea979fc4cd20f4a56e3c4ad6df0ec0b14a,11,bull:foo:wait,bull:foo:active,bull:foo:prioritized,bull:foo:events,bull:foo:stalled,bull:foo:limiter,bull:foo:delayed,bull:foo:paused,bull:foo:meta,bull:foo ... <REDACTED full-length="324">') +0ms
  ioredis:redis write command[127.0.0.1:6379 (bull:Zm9v)]: 0 -> bzpopmin([ 'bull:foo:marker', '2.546' ]) +1ms
  ioredis:redis write command[127.0.0.1:6379 (bull:Zm9v)]: 0 -> bzpopmin([ 'bull:foo:marker', '2.546' ]) +0ms
2024-08-03T15:37:30.454Z ioredis:redis status[127.0.0.1:6379]: end -> ready
2024-08-03T15:37:30.455Z ioredis:redis queue command[127.0.0.1:6379]: 0 -> info([])
2024-08-03T15:37:30.456Z ioredis:redis status[127.0.0.1:6379]: ready -> close
2024-08-03T15:37:30.456Z ioredis:connection skip reconnecting since the connection is manually closed.
2024-08-03T15:37:30.456Z ioredis:redis status[127.0.0.1:6379]: close -> end

Code of Conduct

manast commented 1 month ago

It seems like there is some kind of edge case when closing a queue where the queue maybe has not been completely established. In any case I would recommend to create the queue outside of the process function as that would be much more performant instead of making a new connection to Redis, load all BullMQ commands and so on per job.