breejs / bree

Bree is a Node.js and JavaScript job task scheduler with worker threads, cron, Date, and human syntax. Built for @ladjs, @forwardemail, @spamscanner, @cabinjs.
https://jobscheduler.net
MIT License
3.06k stars 79 forks source link

Error: Job "email" is already running #25

Closed anjopater closed 4 years ago

anjopater commented 4 years ago

Hey,
After some day using and running the job, when I try to run my dev environment node bree.js I get Error: Job "email" is already running, the strange thing is that a I kill all node process and I reestart my machine and looks like the worker is start running when start my pc, any idea how to fix that?

Error: Job "email" is already running at Bree.run (C:\...\node_modules\bree\index.js:571:11)

niftylettuce commented 4 years ago

Can you share a snippet of your configuration you pass to Bree? That output appears when you've already started the job, it appears that you have a low interval or something and it tries to spawn it again before it finishes.

Ref: https://github.com/breejs/bree/blob/1b9a6ed47930df10881cba494226b872cb074f58/index.js#L568-L572

anjopater commented 4 years ago
const bree = new Bree({
  //logger: cabin,
  jobs: [
    {
      name: 'email',
      interval: '8s'
    }
  ]
});

However if try with 30s it happen too or with a higher interval`

niftylettuce commented 4 years ago

It's because your email job isn't exiting probably. See examples for more insight.

anjopater commented 4 years ago

My friend, I think that's not the case, I have a job with that name, i think it's related with the worker_threads management in the library, i will keep reviewing

niftylettuce commented 4 years ago

What are contents of email job?

anjopater commented 4 years ago
const fs = require('fs');
const path = require('path');
const { parentPort } = require('worker_threads');
const nodemailer = require('nodemailer');
const Cabin = require('cabin');
const Email = require('email-templates');
const { Signale } = require('signale');

// initialize cabin
const cabin = new Cabin({
  axe: {
    logger: new Signale()
  }
});

// store boolean if the job is cancelled
let isCancelled = false;

// handle cancellation (this is a very simple example)
if (parentPort)
  parentPort.once('message', message => {
    if (message === 'cancel') isCancelled = true;
  });

// load the queue
const queueFile = path.join(__dirname, '..', 'queue.json');
if (!fs.existsSync(queueFile)) {
  cabin.info(`queue file does not exist yet: ${queueFile}`);
  // signal to parent that the job is done
  if (parentPort) parentPort.postMessage('done');
  else process.exit(0);
}

const queue = require(queueFile);

(async () => {
  // send emails
  await Promise.all(
    queue.map(async result => {
      // if we've already cancelled this job then return early
      if (isCancelled) return;
      // if it's before the time we need to send the message then return early
      if (Date.now() < new Date(result.send_at).getTime()) {
        cabin.info('It it not time yet to send message', { result });
        return;
      }
      console.log('Sending email..');
        /* await email.send({
          message: {
            to: result.email,
            subject: 'Movie starts in less than 10 minutes!',
            html:
              '<p>Your movie will start in less than 10 minutes. Hurry up and grab your snacks.</p>'
          }
        });*/
        // flush the queue of this message
        try {
          const currentQueue = require(queueFile);
          const index = currentQueue.findIndex(r => r.id === result.id);
          if (index === -1) return;
          delete currentQueue[index];
          await fs.promises.writeFile(
            queueFile,
            JSON.stringify(currentQueue.filter(Boolean))
          );
        } catch (err) {
          cabin.error(err);
        }
      } catch (err) {
        cabin.error(err);
      }
    })
  );

  // signal to parent that the job is done
  if (parentPort) parentPort.postMessage('done');
  else process.exit(0);
})();
niftylettuce commented 4 years ago

There is a syntax error in the snippet you shared.

You have an extra catch which should be removed:

          await fs.promises.writeFile(
            queueFile,
            JSON.stringify(currentQueue.filter(Boolean))
          );
        } catch (err) {
          cabin.error(err);
        }
-       } catch (err) {
-         cabin.error(err);
-       }
    })
  );
niftylettuce commented 4 years ago

You can reference https://github.com/breejs/express-example if you need. There is also usage example in the README.

alvinlys commented 3 years ago

Just to pinpoint that actually if (parentPort) parentPort.once('message', message => { if (message === 'cancel') isCancelled = true; }); is not suppose to appear in Promises job which shown in Writing jobs with Promises and async-await

shadowgate15 commented 3 years ago

That specific line can be in promises job. All it does is allow for graceful worker termination.

The line in Writing jobs with Promises and async-await is meant to occur after the await because it indicates that the worker is done and exits.