pulsecron / pulse

The modern MongoDB-powered job scheduler library for Node.js
https://pulsecron.com
MIT License
90 stars 4 forks source link

Jobs after creation are not adding to the database #23

Closed kostysh closed 3 months ago

kostysh commented 3 months ago

Hi, please clarify the job workflow. I am adding jobs using pulse.create('<job_name>', '{job_data}'). When a job is added without errors, it does not appear in the job collection (I have checked via MongoDB console) and _id parameter of a job is undefined. If a server goes down at this moment all the added jobs will be missed after the server restart. How do you manage jobs to make it possible to finish all unfinished jobs after the server restart?

By the way, here is my configuration with which I am starting a queue:

const queue = new Pulse({
    name: 'MyQueue',
    db: {
      address: databaseUrl, // a valid connection URL
      collection: 'jobs',
    },
    defaultConcurrency: 3,
    maxConcurrency: 5,
    processEvery: '10 seconds',
  });
code-xhyun commented 3 months ago

Thank you for the issue! I will solve these and give you an answer as soon as possible.

kostysh commented 3 months ago

Maybe a job must be saved explicitly right after creation using a save() method of a job? This is not quite obvious. Could you please clarify the desired behavior?

kostysh commented 3 months ago

Just an idea. Maybe setting up the Discussions feature for the repository as a place for questions is worth it?

code-xhyun commented 3 months ago

Just an idea. Maybe setting up the Discussions feature for the repository as a place for questions is worth it?

I'm not using the Discuss feature, but I'm going to open Discord as soon as possible. @kostysh

code-xhyun commented 3 months ago

Maybe a job must be saved explicitly right after creation using a save() method of a job? This is not quite obvious. Could you please clarify the desired behavior?

The create() method does NOT save the job in the database. When creating a job using create(), you must explicitly declare save()if you want to save it

https://docs-pulse.pulsecron.com/docs/creating-jobs/create#example-usage

kostysh commented 3 months ago

When creating a job using create(), you must explicitly declare save()if you want to save it

Thank you. Currently, jobs are being added to the database. But the workflow of a job is not yet clear.

Which is a use case when you do not need to store jobs in a database? In my opinion, all new jobs should initially (and automatically) be saved to the database, and then, the queue manager should take jobs from there one by one (or in batches, according to configured rules) for processing. This way, if the queue instance goes offline immediately after a job is created (for example, the server goes down), all saved jobs can be restored and processed when the queue comes back up.

I am not familiar with the code yet but I was not able to find a place where saved jobs are restored from the database to the queue at the start.

As I am right, to be processed at the start time jobs must be populated in the _definitions property of the main class. But this property starts as an empty array.

code-xhyun commented 3 months ago

When creating a job using create(), you must explicitly declare save()if you want to save it

Thank you. Currently, jobs are being added to the database. But the workflow of a job is not yet clear.

Which is a use case when you do not need to store jobs in a database? In my opinion, all new jobs should initially (and automatically) be saved to the database, and then, the queue manager should take jobs from there one by one (or in batches, according to configured rules) for processing. This way, if the queue instance goes offline immediately after a job is created (for example, the server goes down), all saved jobs can be restored and processed when the queue comes back up.

I am not familiar with the code yet but I was not able to find a place where saved jobs are restored from the database to the queue at the start.

As I am right, to be processed at the start time jobs must be populated in the _definitions property of the main class. But this property starts as an empty array.

If you are looking for a method that saves immediately why not refer to this? https://docs-pulse.pulsecron.com/docs/creating-jobs

example

    await pulse.start();
    await pulse.every('1 minutes', 'delete old users', { description: 'test' }); 
    // or   await pulse.schedule('in 1 minutes', 'delete old users', { to: 'admin@example.com' });
kostysh commented 3 months ago

In my case, I have to process a series of tasks, each of which can take approximately 30 seconds to 2 minutes. These tasks are not recurrent and should not be scheduled. I just want to be sure, that if this task has been enqueued it will be guaranteed to be processed and the processing result will be logged. BullMQ satisfied this use case but I do not want to have one more database in my project stack.

kostysh commented 3 months ago

Also, having all the jobs in the memory is not a good idea from a scalability perspective. I do not want to impose a different approach to task management than the one you intended. Just want to understand the idea under the Pulse queue.

code-xhyun commented 3 months ago

Also, having all the jobs in the memory is not a good idea from a scalability perspective. I do not want to impose a different approach to task management than the one you intended. Just want to understand the idea under the Pulse queue.

Such a design approach helps to manage the application’s logic and data flow more clearly, empowering users to take a more active role in managing the state of data, thereby leading to more stable and predictable applications

In my case, I have to process a series of tasks, each of which can take approximately 30 seconds to 2 minutes. These tasks are not recurrent and should not be scheduled. I just want to be sure, that if this task has been enqueued it will be guaranteed to be processed and the processing result will be logged. BullMQ satisfied this use case but I do not want to have one more database in my project stack.

I don't understand all of your situation, but can the following example be your solution?

    await pulse.start();
    const job = pulse.create('delete old users', { to: 'pulsecron@gmail.com' });
    await job.save();
    // write your own logic...
    job.repeatEvery('10 minutes');
    job.unique({ 'data.type': 'email', 'data.userId': '12345' });
    await job.save();
code-xhyun commented 3 months ago

Also, having all the jobs in the memory is not a good idea from a scalability perspective. I do not want to impose a different approach to task management than the one you intended. Just want to understand the idea under the Pulse queue.

You can also specify the 'concurrency' option as a small number so that only a certain number of jobs can be into memory.

https://docs-pulse.pulsecron.com/docs/defining-job-processors#parameters

kostysh commented 3 months ago

Ok, I will try to be clear with the example:

interface MyDataType {
//...
}

pulse.define<MyDataType>('processData', /***/); // job processor

// This helper can be called by API method at any time
function createJob <T extends MyDataType>(pulse: Pulse, data: T) {
  const job = pulse.create<T>('processData', data);
  await job.save(); // ok, as it is
}

await pulse.start();

// ^--- unprocessed jobs must be restored and processed here

I expect that after a server restart, any unprocessed jobs created prior to the restart will be restored from a database and processed. This is a usual practice for queues with persisted jobs.

code-xhyun commented 3 months ago
```ts

I expect that after a server restart, any unprocessed jobs created prior to the restart will be restored from a database and processed. This is a usual practice for queues with persisted jobs.

In the current case, if the server goes down in the middle and then restarts, nextRunAt will be modified to the next schedule in line with the job schedule rule.

But Do you want the job to run again as soon as the server restarts?

kostysh commented 3 months ago

But Do you want the job to run again as soon as the server restarts?

If this job is not been processed - yes.

code-xhyun commented 3 months ago

But Do you want the job to run again as soon as the server restarts?

If this job is not been processed - yes.

I'll implement this as soon as possible and get it. As in the example of bullMq, I think this feature is essential, but it is missing. Thank you for your insight @kostysh

kostysh commented 3 months ago

I'll implement this as soon as possible and get it. As in the example of bullMq, I think this feature is essential, but it is missing.

Thank you! I guess, we can close this issue. In future, we can move such long discussions to Discord

code-xhyun commented 3 months ago

https://github.com/pulsecron/pulse/pull/25

code-xhyun commented 3 months ago

25

I brought you what you wanted! @kostysh

I also added discussions in response to your comments. and I will add discord later.

kostysh commented 3 months ago

25

I brought you what you wanted! @kostysh

I also added discussions in response to your comments. and I will add discord later.

Nice! I see, resumeOnRestart.