OptimalBits / bull

Premium Queue package for handling distributed jobs and messages in NodeJS.
Other
15.44k stars 1.43k forks source link

a job is successfully completed, but the job in active state #951

Closed hbakhtiyor closed 6 years ago

hbakhtiyor commented 6 years ago

Description

Test code to reproduce

async function queueProcess(job) {
  return await demoFunc(
    job.data.url,
    async (data) => {
      await job.progress({data});
    }
  );
}

var Queue = require('bull');

const queue = new Queue('demo');
queue.process(queueProcess);

let job;
job = await queue.getJob(uuid);
if (!job) {
  job = await queue.add({data}, {jobId: uuid, removeOnComplete: true, removeOnFail: true});
}

queue.on('progress', async (_job, {data}) => {
  if(_job.id === job.id) {
    // todo something
  }
});

const result = await job.finished();

Bull version

3.4.2

Additional information

manast commented 6 years ago

The code above does not work, but supposing it did, I am not sure how it would reproduce the title of the issue :/

hbakhtiyor commented 6 years ago

i did get result from const result = await job.finished(); line, and the job has not been removed, and check via arena ui, there the job is active state.

manast commented 6 years ago

The code does not run so I cannot reproduce it...

hbakhtiyor commented 6 years ago

here's some working sample tested in node 10.1.0, ubuntu 18.04 using docker base redis (4.0.9) and run without any additional configuration docker run -d -p 6379:6379 redis

const express = require('express')
const Queue = require('bull');
const uuidV5 = require('uuid/v5');

const app = express();

// pre-defined uuid json namespace
const UUID_JSON = '18e8eebd-77b5-4f97-acac-2c0e2cca482b';

async function sleep(delay) {
  return new Promise((ok, fail) => {
    setTimeout(ok, delay * 1000);
  });
}

async function addFunc(a, b, cb) {
  const add = a + b;
  if (cb) {
    await sleep(1);
    await cb('processing #1');
    await sleep(2);
    await cb('processing #2');
    await sleep(2);
    await cb(add);
  }
  return add;
}

async function queueAddProcess(job) {
  const result = await addFunc(
    job.data.a,
    job.data.b,
    job.data.stream ?
    async (data) => {
      await job.progress({data});
    } : null
  );

  let final;
  if (result % 2 === 0) {
    final = result + 2;
  } else {
    final = result + 1;
  }

  return Promise.resolve(final);
}

app.get('/add/:a/:b/:stream?', async (req, res) => {
  console.log('request params', req.params);
  const a = parseInt(req.params.a);
  const b = parseInt(req.params.b);
  const stream = req.params.stream === 'true';

  if (stream) {
    console.log('using streamming');
  }
  const queue = new Queue('add');
  queue.process(queueAddProcess);

  const uuid = uuidV5(JSON.stringify({a, b}), UUID_JSON);
  let job;
  job = await queue.getJob(uuid);
  if (!job) {
    job = await queue.add({a, b, stream}, {jobId: uuid, removeOnComplete: true, removeOnFail: true});
    console.log('added job');
  }

  queue.on('progress', async (_job, {data}) => {
    if(_job.id === job.id) {
      // progressing data
      // simulating delay
      await sleep(3);
      console.log(data);
      res.write(data);
    }
  });

  console.log('processing');
  const result = await job.finished();
  console.log('finished processing');
  if (stream) {
    res.end();
  } else {
    res.send(result.toString());
    console.log('sent request');
  }
});

app.listen(3000, () => console.log('Example app listening on port 3000!'));
manast commented 6 years ago

I am sorry, but we need a minimal code snippet that reproduces the issue, the code above includes many things that I suspect are not necessary to reproduce the issue. Ideally they should look as a standard test case, unfortunately we do not have time to debug user specific code: https://github.com/OptimalBits/bull/blob/master/test/test_queue.js

hbakhtiyor commented 6 years ago

hmm, i spent my times to report it with real reproducable steps, and you just closed without any investigations!!!

manast commented 6 years ago

I asked for a minimal code that reproduces the issue, I do not think things like

let final;
  if (result % 2 === 0) {
    final = result + 2;
  } else {
    final = result + 1;
  }

or an express server are required to reproduce the issue. Unfortunately we do not have resources to debug user specific code. I can reopen if the appropriate code is provided.

hbakhtiyor commented 6 years ago

the example hangs (not exit after finishing)

const Queue = require('bull');

const uuid = '18e8eebd-77b5-4f97-acac-2c0e2cca482b';

function addFunc(a, b) {
  return a + b;
}

function queueAddProcess(job) {
  const result = addFunc(
    job.data.a,
    job.data.b
  );

  return Promise.resolve(result);
}

(async () => {
  const a = 1;
  const b = 2;

  const queue = new Queue('add');
  queue.process(queueAddProcess);

  let job;
  job = await queue.getJob(uuid);
  if (!job) {
    job = await queue.add({a, b}, {jobId: uuid, removeOnComplete: true, removeOnFail: true});
    console.log('added job');
  }

  console.log('processing');
  const result = await job.finished();
  console.log('finished processing', result);
})();
manast commented 6 years ago

just add queue.close() after console.log('finished processing', result);

hbakhtiyor commented 6 years ago

always need to close it?

another issue that when using progress delay and it will not wait progress event finished, and job.finished fired first

and the output will be:

added job
processing
processing #1
finished processing 3
processing #2

but need this output sequence

added job
processing
processing #1
processing #2
finished processing 3
const Queue = require('bull');

const uuid = '18e8eebd-77b5-4f97-acac-2c0e2cca482b';

async function sleep(delay) {
  return new Promise((ok, fail) => {
    setTimeout(ok, delay * 1000);
  });
}

async function addFunc(a, b, cb) {
  await sleep(1);
  await cb('processing #1');
  await sleep(2);
  await cb('processing #2');
  await sleep(2);
  return a + b;
}

async function queueAddProcess(job) {
  const result = addFunc(
    job.data.a,
    job.data.b,
    async (data) => {
      await job.progress({data});
    } 
  );

  return Promise.resolve(result);
}

(async () => {
  const a = 1;
  const b = 2;

  const queue = new Queue('add');
  queue.process(queueAddProcess);

  let job;
  job = await queue.getJob(uuid);
  if (!job) {
    job = await queue.add({a, b}, {jobId: uuid, removeOnComplete: true, removeOnFail: true});
    console.log('added job');
  }

  queue.on('progress', async (_job, {data}) => {
    if(_job.id === job.id) {
      // progressing data
      // simulating delay
      await sleep(3);
      console.log(data);
    }
  });

  console.log('processing');
  const result = await job.finished();
  console.log('finished processing', result);
  await queue.close();
})();
manast commented 6 years ago

A queue has to be closed in order to exit nodejs event loop, this is how node works, not specific to bull.

manast commented 6 years ago

Regarding your code, a few things I saw:

hbakhtiyor commented 6 years ago

even if we use number, we get the same sequence

added job
processing
1
finished processing 3
2
manast commented 6 years ago

you are sleeping 3 seconds before console log the progress, aren't you? remove it and check again:

  queue.on('progress', async (_job, {data}) => {
    if(_job.id === job.id) {
      // progressing data
      // simulating delay
      await sleep(3);
      console.log(data);
    }
  });
hbakhtiyor commented 6 years ago

i know, it just simulating delay, which my use case

manast commented 6 years ago

honestly... check your code, if you sleep 3 seconds before printing the progress and you sleep 2 seconds before finishing the job, don't you agree that you get the correct result?

added job
processing
processing #1
finished processing 3
processing #2

why would you expect something else?

hbakhtiyor commented 6 years ago

why it not await for progress to finish?

also another issue that, if we run the code second time (while first run is processing) the progress event doesn't work

manast commented 6 years ago

progress is an event, the queue does not care what you do inside it.

hbakhtiyor commented 6 years ago

also another issue that, if we run the code second time (while first run is processing) the progress event doesn't work

how about this issue?

manast commented 6 years ago

how can I know... add a minimal test case and we can take a look at it.

hbakhtiyor commented 6 years ago

the last snippet of code