pulsecron / pulse

The modern MongoDB-powered job scheduler library for Node.js
https://pulsecron.com
MIT License
90 stars 4 forks source link

`JobAttributes` not typed correctly #13

Closed kostysh closed 3 months ago

kostysh commented 3 months ago

I just tried Pulse and faced an issue with a job processor definition type. Here is a short example:

import { JobAttributes, JobAttributesData } from '@pulsecron/pulse';

interface ProcessJobData extends JobAttributesData {
  myId: string;
}

pulse.define<JobAttributes<ProcessJobData>>(
    'processJob',
    async (job) => {
      const { myId } = job.attrs.data; // `data` here is `any` of type, but must be `ProcessJobData` of type
      // ...
    },
    {
      concurrency: 3,
      priority: 'normal',
    },
  );

The issue is, possibly, here:

export interface JobAttributes<T extends JobAttributesData = JobAttributesData> {
    _id: mongodb.ObjectId;
    pulse: Pulse;
    type: string;
    name: string;
    disabled?: boolean;
    nextRunAt?: Date | null;
    lockedAt?: Date | null;
    priority: number | string;
    data: any; // <-- should be T, not any
    uniqueQuery?: any;
    uniqueOpts?: {
        insertOnly: boolean;
    };
    repeatInterval?: string;
    repeatTimezone?: string | null;
    repeatAt?: string;
    lastRunAt?: Date;
    lastFinishedAt?: Date;
    startDate?: Date | number | null;
    endDate?: Date | number | null;
    skipDays?: string | null;
    failReason?: string;
    failCount?: number;
    failedAt?: Date;
    lastModifiedBy?: string;
    shouldSaveResult?: boolean;
    result?: unknown;
}
code-xhyun commented 3 months ago

Thank you for the issue! I will solve these and give you an answer as soon as possible.

code-xhyun commented 3 months ago

@kostysh This was fixed in v1.1.9 πŸš€

Use it like this!

interface ProcessJobData extends JobAttributesData {
  myId: string;
}

pulse.define<ProcessJobData>(
  'processJob',
  async (job) => {
    const { myId } = job.attrs.data;
    // ...
  },
  {
    concurrency: 3,
    priority: 'normal',
  }
);
cheeselemon commented 3 months ago

Cool! Thanks!

kostysh commented 3 months ago

Thank you, it works now. What is about a done callback argument? In Agenda it was possible to provide done with an error for saving the finished job state as failed with a specific error.

code-xhyun commented 3 months ago

Thank you, it works now. What is about a done callback argument? In Agenda it was possible to provide done with an error for saving the finished job state as failed with a specific error.

@kostysh The done() callback argument is just a void. Is there an example code for the case you're talking about?

Maybe this is what you want?

pulse.define(
  'send email report',
  async (job, done) => {
    const { to } = job.attrs.data;

    console.log(`Sending email report to ${to}`);
    // this!
    try {
      done();
    } catch (error) {
      console.log('error:', error);
    }
  },
  { lockLifetime: 5 * 1000, priority: 'high', concurrency: 10 }
);
tiberioalunni commented 3 months ago

Thank you, it works now. What is about a done callback argument? In Agenda it was possible to provide done with an error for saving the finished job state as failed with a specific error.

@kostysh The done() callback argument is just a void. Is there an example code for the case you're talking about?

As I can see done function should match the function const jobCallback = async (error?: Error, result?: unknown) of https://github.com/pulsecron/pulse/blob/main/src/job/run.ts

as it is called here:

if (definition.fn.length === 2) {
  debug('[%s:%s] process function being called', this.attrs.name, this.attrs._id);
  await definition.fn(this, jobCallback); // <---- definition function with done callback param
} else {
  debug('[%s:%s] process function being called', this.attrs.name, this.attrs._id);
  const result = await definition.fn(this);
  await jobCallback(undefined, result);
}

Processor definition should be something like:

export type Processor<T extends JobAttributesData> = (job: Job<T>, done?: (error?: Error, result?: unknown) => void) => unknown | Promise<unknown>;

Usage with done param:

pulse.define('sendEmail', async (job, done) => {
 //....
  if (error) {
    done(new Error('Error message'));
    return;
  }

  // if some result has to be returned:
  done(undefined, 'Result data');
  // else 
  done();
})

Usage without done param:

pulse.define('sendEmail', async (job) => {
 //....

  if (error) {
    throw new Error('Error message')
  }

  // if some result has to be returned:
  return 'Result data';
  // else just return void
})
code-xhyun commented 3 months ago

κ°μ‚¬ν•©λ‹ˆλ‹€. 이제 μž‘λ™ν•©λ‹ˆλ‹€. 콜백 인수 λŠ” λ¬΄μ—‡μž…λ‹ˆκΉŒ done? νŠΉμ • 였λ₯˜λ‘œ 인해 μ™„λ£Œλœ μž‘μ—… μƒνƒœλ₯Ό μ‹€νŒ¨λ‘œ μ €μž₯ν•˜λŠ” 데 였λ₯˜λ₯Ό Agendaμ œκ³΅ν•  수 μžˆμ—ˆμŠ΅λ‹ˆλ‹€ .done

@kostysh콜백 done()μΈμˆ˜λŠ” 단지 λ¬΄νš¨μž…λ‹ˆλ‹€. 당신이 λ§ν•˜λŠ” 사둀에 λŒ€ν•œ 예제 μ½”λ“œκ°€ μžˆμŠ΅λ‹ˆκΉŒ?

λ‚΄κ°€ λ³Ό 수 μžˆλ“―μ΄ ν•¨μˆ˜λŠ” https://github.com/pulsecron/pulse/blob/main/src/job/run.ts 의 doneν•¨μˆ˜μ™€ μΌμΉ˜ν•΄μ•Ό ν•©λ‹ˆλ‹€ .const jobCallback = async (error?: Error, result?: unknown)

μ—¬κΈ°μ—μ„œλŠ” λ‹€μŒκ³Ό 같이 λΆ€λ¦…λ‹ˆλ‹€.

if (definition.fn.length === 2) {
  debug('[%s:%s] process function being called', this.attrs.name, this.attrs._id);
  await definition.fn(this, jobCallback); // <---- definition function with done callback param
} else {
  debug('[%s:%s] process function being called', this.attrs.name, this.attrs._id);
  const result = await definition.fn(this);
  await jobCallback(undefined, result);
}

ν”„λ‘œμ„Έμ„œ μ •μ˜λŠ” λ‹€μŒκ³Ό κ°™μ•„μ•Ό ν•©λ‹ˆλ‹€.

export type Processor<T extends JobAttributesData> = (job: Job<T>, done?: (error?: Error, result?: unknown) => void) => unknown | Promise<unknown>;

done λ§€κ°œλ³€μˆ˜λ₯Ό μ‚¬μš©ν•œ μ‚¬μš©λ²•:

pulse.define('sendEmail', async (job, done) => {
 //....
  if (error) {
    done(new Error('Error message'));
    return;
  }

  // if some result has to be returned:
  done(undefined, 'Result data');
  // else 
  done();
})

done λ§€κ°œλ³€μˆ˜κ°€ μ—†λŠ” μ‚¬μš©λ²•:

pulse.define('sendEmail', async (job) => {
 //....

  if (error) {
    throw new Error('Error message')
  }

  // if some result has to be returned:
  return 'Result data';
  // else just return void
})

Thank you for your dedication. @tiberioalunni @kostysh We released a new version reflecting your opinion > v1.1.11 πŸš€

Also we updated docs