quirrel-dev / owl

A high-performance, Redis-backed job queueing library originally built for Quirrel. Has an in-memory mode for development use cases.
MIT License
93 stars 10 forks source link

Execution is unreliable when using schedules #220

Open ethndotsh opened 3 weeks ago

ethndotsh commented 3 weeks ago

Hey there,

I am building a system using Owl that is inspired how Quirrel uses Owl and Owl's scheduled executions on the backend but I am running into an issue where sometimes only some of my cron jobs are executed, or none are executed at all. Sometimes the cron jobs that were missed are executed late by several schedules and sometimes not. Overall it is very odd and I am at a loss as to what could be causing it. Here is a simple version of my cron system code, which is heavily inspired by Quirrel's:

import Owl from "@quirrel/owl";
import cronParser from "cron-parser";

import { redisFactory } from "./utils/redis";
import Sentry from "./utils/sentry";

export function parseTimezonedCron(
  cronExpression: string,
): [cron: string, tz: string] {
  return [cronExpression, "Etc/UTC"] as [cron: string, tz: string];
}

export function cronSchedule(last: Date, cron: string): Date {
  return cronParser
    .parseExpression(cron, {
      currentDate: last,
      tz: "Etc/UTC",
    })
    .next()
    .toDate();
}

export const owl = new Owl({
  redisFactory,
  scheduleMap: {
    cron: cronSchedule,
  },
  onError: (error) => {
    console.log(error);
  },
});

type CronJob = () => Promise<void>;

type CronJobs = {
  name: string;
  cron: string;
  handler: CronJob;
};

export const createCronJobs = async (cronJobs: CronJobs[]) => {
  const producer = owl.createProducer();
  const redis = redisFactory();

  const worker = await owl.createWorker(async (job, ackDescriptor) => {
    if (job.id !== "@cron") {
      return;
    }

    try {
      const cronJob = cronJobs.find(
        (cronJob) => cronJob.name === job.queue.split("/")[1],
      );

      if (!cronJob) {
        throw new Error(`No cron job found for queue ${job.queue}`);
      }

      console.log(`Running cron job ${cronJob.name}`);

      await producer.acknowledger.acknowledge(ackDescriptor);

      await Sentry.withMonitor(
        cronJob.name,
        async () => {
          await cronJob.handler();
        },
        {
          schedule: {
            type: "crontab",
            value: cronJob.cron,
          },
          checkinMargin: 2,
          maxRuntime: 7,
        },
      );

      console.log(ackDescriptor);
    } catch (error) {
      await producer.acknowledger.reportFailure(ackDescriptor, job, error);
      console.log(error, "ERROR");
    }

    return;
  });

  const existingQueues = await redis.smembers(`queues:cron`);
  const queuesThatShouldPersist = cronJobs.map((cronJob) => cronJob.name);

  const deleted = new Set<string>();

  await Promise.all(
    existingQueues.map(async (queue) => {
      if (!queuesThatShouldPersist.includes(queue)) {
        await producer.delete(`cron/${queue}`, "@cron");
        deleted.add(queue);
        // await redis.srem(`queues:cron`, queue);
      }
    }),
  ).then(() => {
    console.log(`Deleted ${deleted.size} cron jobs`);
  });

  try {
    await Promise.all(
      Object.entries(cronJobs).map(async ([_, cronJob]) => {
        await redis.sadd(`queues:cron`, cronJob.name);
        return producer.enqueue({
          id: "@cron",
          queue: `cron/${cronJob.name}`,
          override: true,
          payload: "null",
          schedule: {
            type: "cron",
            meta: cronJob.cron,
          },
          runAt: cronParser.parseExpression(cronJob.cron).next().toDate(),
        });
      }),
    );

    console.log("Cron jobs started");
    console.table(cronJobs.map(({ name, cron }) => ({ name, cron })));
  } catch (error) {
    await worker.close();
    await producer.close();
    throw error;
  }

  return {
    close: async () => {
      await worker.close();
      await producer.close();
      redis.disconnect();
    },
  };
};

Any clues as to why this might be? As far as I can tell this is no different from how Quirrel does it. Thank you in advance.

ethndotsh commented 1 week ago

I am using Bun as a runtime in case that changes things

Skn0tt commented 1 week ago

Hmm, it’s been a while since I looked at thus codebase. The only thing thag immediately comes to mind is that maybe your worker function is taking too long to run cronJob.handler and that‘s blocking Owl from picking up the next job? Try removing the await from there to see if that‘s what‘s causing it. If that doesn‘t help, i‘d recommend you step through the problem with a debugger 🤷‍♂️

ethndotsh commented 1 week ago

Hmm, it’s been a while since I looked at thus codebase. The only thing thag immediately comes to mind is that maybe your worker function is taking too long to run cronJob.handler and that‘s blocking Owl from picking up the next job? Try removing the await from there to see if that‘s what‘s causing it. If that doesn‘t help, i‘d recommend you step through the problem with a debugger 🤷‍♂️

It can't be this because I log before running anything and nothing is being logged. I will try a debugger, thank you 👍

ethndotsh commented 1 week ago

I've done some digging and it seems that the issue is that the request.lua file is saying 'wait' for x amount of seconds but then once those seconds are up it doesn't trigger the job it just skips over it to the next 'wait' item. Any idea why this might be?

ethndotsh commented 6 days ago

I have done some more light digging (I have been busy this weekend) and creating an activity logs a "requested" message every time correctly but never an "acknowledged" message or any of the continuing messages. The worker is never being triggered for some reason.