taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
6.16k stars 405 forks source link

Feature: List of jobs for worker #405

Open darki73 opened 3 years ago

darki73 commented 3 years ago

The WHAT part

It would be nice to see which jobs worker is working on.

The WHY part

Whenever you are building a distributed service, which will be executed on multiple machines, it is nice to know how many workers and jobs this specific machine is running.

Second thing, when application receives any of the SIG**** signals, it would be nice to do something about the jobs which are currently processing.

Desired behavior:

  1. User calls for docker-compose stop && docker-compose pull && docker-compose up -d
  2. Application is shutting down, and releasing all the jobs which are currently processed by its worker back to WAITING state (could also send exit to the sandboxed processors via method parameter or smth)
  3. If necessary (user specific cases), some other code is performed during that process
  4. Application is restarted and new workers take other jobs to process

Current behavior:

  1. User calls for docker-compose stop && docker-compose pull && docker-compose up -d
  2. Application is shutting down, all active jobs are simply left intact (in ACTIVE state)
  3. Application is restarted and new workers take other jobs to process, however, those ACTIVE jobs will never get processed (even when the lock timer expires)

The HOW part

There are few fields for improvement (in my opinion).

1. Better worker creation process First of all, it would be nice to have ability to set name for the worker

const worker: Worker = new Worker('generic_queue', 'sandboxed.processor.js', {
    name: 'machine_hostname_or_whatever',
    concurrency: 1,
    connection: {
        host: '127.0.0.1',
        port: 6379
    }
});

Name could be optional due to the fact, that queue.getWorkers() method actually returns IDs for workers.

2. Actually allow user to access Worker ID The ID for worker is actually present in the queue.getWorkers() method, however, it is not accessible to the user image

3. Something like this would be nice to have in the per-worker output of queue.getWorkers()

interface WorkerMetadata {
    id: string;
    addr: string;
    fd: string;
    jobs: [], // Store TOKENS/IDs for the jobs currently running on this worker
    name: string; // This is actually queue name, and not the worker name
    ...and so on
}

4. Mark queues as 'master' and 'slave'

new Queue('generic_queue', {
    master: true,
    connection: { 
        host: '127.0.0.1',
        port: 6379
    },
})

This is needed as multiple application might need to connect to queue (maybe some sort of governor for # of workers to create, for example if there are only 5 jobs and 5 servers, each server will only create 1 worker).

const serversCount: number = 5; // More like = query some sort of API or Discovery Service for number of services
const jobsCount: number = await this.queue.getActiveCount() + await this.queue.getWaitingCount();
const workersCount: number = await this.queue.getWorkers((workers) => workers.length));
const scaleTo: number = Math.ceil((jobsCount - workersCount) / serversCount);

Right now, as i see it, every application which creates queue with the same name will effectively become a manager of this queue. The reason i think so is because whenever i start second application which creates instance of Queue with the same name, both of them will eventually stop processing jobs.

So, master/slave is needed to basically disable the "manager" functionality of particular queue.

manast commented 3 years ago

Thanks for the issue.

In general I agree that a better "workers" handling is desirable. Something to consider is that once a worker is instantiated it generates a uuid that it uses for locking the jobs, only a worker with the correct uuid can unlock the job, so this uuid could be used for representing a unique worker in the queue.

Please see my answers below:

3) Application is restarted and new workers take other jobs to process, however, those ACTIVE jobs will never get processed (even when the lock timer expires)

New workers will process old jobs that where active but that lost their locks, however you need to have at least 1 QueueScheduler instance so that the mechanism works.

  1. Better worker creation process First of all, it would be nice to have ability to set name for the worker

I agree. That would be a nice and useful feature.

Right now, as i see it, every application which creates queue with the same name will effectively become a manager of this queue. The reason i think so is because whenever i start second application which creates instance of Queue with the same name, both of them will eventually stop processing jobs.

This should not be the behaviour, you can have as many instance of the same Queue as you want and it should not matter. If you found a case where this does not hold please submit it as a bug and I will look into it.

darki73 commented 3 years ago

And where can i get this worker UUID? This is example code:

const { Queue, Worker } = require('bullmq');

const asyncFunction = async() => {
    const queue = new Queue('test-queue');

    const worker = new Worker('test-queue', async (job) => {

    }, { concurrency: 1 });

    worker.waitUntilReady().then(async () => {
        console.log(worker, await queue.getWorkers());
    });
}

void asyncFunction();

And this is the output:

Worker {
  _events: [Object: null prototype] { error: [Function (anonymous)] },
  _eventsCount: 1,
  _maxListeners: undefined,
  name: 'test-queue',
  opts: {
    drainDelay: 5,
    concurrency: 1,
    lockDuration: 30000,
    prefix: 'bull',
    lockRenewTime: 15000
  },
  connection: RedisConnection {
    _events: [Object: null prototype] { error: [Function: bound emit] },
    _eventsCount: 1,
    _maxListeners: undefined,
    opts: {
      port: 6379,
      host: '127.0.0.1',
      retryStrategy: [Function: retryStrategy]
    },
    _client: Redis {
      options: [Object],
      _events: [Object: null prototype],
      _eventsCount: 1,
      _maxListeners: undefined,
      scriptsSet: [Object],
      commandQueue: [Denque],
      offlineQueue: [Denque],
      connectionEpoch: 1,
      connector: [StandaloneConnector],
      retryAttempts: 0,
      _addedScriptHashes: {},
      _autoPipelines: Map(0) {},
      _runningAutoPipelines: Set(0) {},
      _addedScriptHashesCleanInterval: Timeout {
        _idleTimeout: 60000,
        _idlePrev: [Timeout],
        _idleNext: [Timeout],
        _idleStart: 147,
        _onTimeout: [Function (anonymous)],
        _timerArgs: undefined,
        _repeat: 60000,
        _destroyed: false,
        [Symbol(refed)]: true,
        [Symbol(kHasPrimitive)]: false,
        [Symbol(asyncId)]: 8,
        [Symbol(triggerId)]: 1
      },
      status: 'ready',
      condition: [Object],
      stream: [Socket],
      // methods here
    },
    initializing: Promise { [Redis] },
    [Symbol(kCapture)]: false
  },
  keys: {
    // bull keys here
  },
  waiting: false,
  blockingConnection: RedisConnection {
    _events: [Object: null prototype] { error: [Function: bound emit] },
    _eventsCount: 1,
    _maxListeners: undefined,
    opts: {
      port: 6379,
      host: '127.0.0.1',
      retryStrategy: [Function: retryStrategy]
    },
    _client: Redis {
      options: [Object],
      _events: [Object: null prototype],
      _eventsCount: 1,
      _maxListeners: undefined,
      scriptsSet: [Object],
      commandQueue: [Denque],
      offlineQueue: [Denque],
      connectionEpoch: 1,
      connector: [StandaloneConnector],
      retryAttempts: 0,
      _addedScriptHashes: {},
      _autoPipelines: Map(0) {},
      _runningAutoPipelines: Set(0) {},
      _addedScriptHashesCleanInterval: Timeout {
        _idleTimeout: 60000,
        _idlePrev: [TimersList],
        _idleNext: [Timeout],
        _idleStart: 147,
        _onTimeout: [Function (anonymous)],
        _timerArgs: undefined,
        _repeat: 60000,
        _destroyed: false,
        [Symbol(refed)]: true,
        [Symbol(kHasPrimitive)]: false,
        [Symbol(asyncId)]: 11,
        [Symbol(triggerId)]: 1
      },
      status: 'ready',
      condition: [Object],
      stream: [Socket],
      serverInfo: [Object],
      addJob: [Function (anonymous)],
      addJobBuffer: [Function (anonymous)],
      cleanJobsInSet: [Function (anonymous)],
      cleanJobsInSetBuffer: [Function (anonymous)],
      extendLock: [Function (anonymous)],
      extendLockBuffer: [Function (anonymous)],
      isFinished: [Function (anonymous)],
      isFinishedBuffer: [Function (anonymous)],
      isJobInList: [Function (anonymous)],
      isJobInListBuffer: [Function (anonymous)],
      moveStalledJobsToWait: [Function (anonymous)],
      moveStalledJobsToWaitBuffer: [Function (anonymous)],
      moveToActive: [Function (anonymous)],
      moveToActiveBuffer: [Function (anonymous)],
      moveToDelayed: [Function (anonymous)],
      moveToDelayedBuffer: [Function (anonymous)],
      moveToFinished: [Function (anonymous)],
      moveToFinishedBuffer: [Function (anonymous)],
      pause: [Function (anonymous)],
      pauseBuffer: [Function (anonymous)],
      promote: [Function (anonymous)],
      promoteBuffer: [Function (anonymous)],
      releaseLock: [Function (anonymous)],
      releaseLockBuffer: [Function (anonymous)],
      removeJob: [Function (anonymous)],
      removeJobBuffer: [Function (anonymous)],
      removeRepeatable: [Function (anonymous)],
      removeRepeatableBuffer: [Function (anonymous)],
      reprocessJob: [Function (anonymous)],
      reprocessJobBuffer: [Function (anonymous)],
      retryJob: [Function (anonymous)],
      retryJobBuffer: [Function (anonymous)],
      takeLock: [Function (anonymous)],
      takeLockBuffer: [Function (anonymous)],
      updateDelaySet: [Function (anonymous)],
      updateDelaySetBuffer: [Function (anonymous)],
      updateProgress: [Function (anonymous)],
      updateProgressBuffer: [Function (anonymous)],
      [Symbol(kCapture)]: false
    },
    initializing: Promise { [Redis] },
    [Symbol(kCapture)]: false
  },
  processFn: [AsyncFunction (anonymous)],
  timerManager: TimerManager { timers: Map(0) {} },
  processing: Map(1) {
    Promise { <pending> } => 'ba99c7bc-27e9-4c42-ab88-974fd358fbbc'
  },
  [Symbol(kCapture)]: false
} [
  {
    id: '87',
    addr: '192.168.65.3:37048',
    fd: '10',
    name: 'test-queue',
    age: '0',
    idle: '0',
    flags: 'N',
    db: '0',
    sub: '0',
    psub: '0',
    multi: '-1',
    qbuf: '0',
    'qbuf-free': '32768',
    'argv-mem': '0',
    obl: '0',
    oll: '0',
    omem: '0',
    'tot-mem': '61464',
    events: 'r',
    cmd: 'client',
    user: 'default'
  }
]

Is it the output of processing: Map promise - ba99c7bc-27e9-4c42-ab88-974fd358fbbc ?

And if so, how can i access it?

worker.waitUntilReady().then(async (connectionInformation) => {
    // Probably it is inside `connectionInformation`, however, IDE gives no sign on how to access it
});
darki73 commented 3 years ago

Scratch that.

processing: Map(1) {
    Promise { <pending> } => 'ba99c7bc-27e9-4c42-ab88-974fd358fbbc'
},

This piece of code returns the jobs this worker is currently working on. It is accessible in JavaScript, however, there is no way to access it in TypeScript without usage of // @ts-ignore

So all we need right now is the access to the src/classes/worker.ts#L39 private property, something like this:

async workingOn(): Promise<string[]> {
    if (!this.processing) {
        return [];
    }
    return this.processing.map((_: Promise, token: string) => token);
}

async isIdle(): Promise<boolean> {
    return this.workingOn().then((tokens: string[]) => tokens.length === 0);
}

async isActive(): Promise<boolean> {
    return !this.isIdle();
}

This code is not tested.

manast commented 3 years ago

@darki73 that would only give you the active jobs of a given worker, but we need to be able to list all the workers in the system... I think this can only be achieved by having a new data structure where we keep the current active workers, this structure needs to automatically expire workers that goes offline...

andrisi commented 3 months ago

FYI: https://github.com/taskforcesh/bullmq/discussions/2681