taichino / croniter

croniter is a python module to provide iteration for datetime object.
http://github.com/taichino/croniter
387 stars 105 forks source link

Last weekday per month #159

Closed lowell80 closed 3 years ago

lowell80 commented 3 years ago

Does croniter support any way to reference the day of the week within a month? For example, say I want match the last Friday of every month?

(I'm pretty sure I know the answer to this, since I've spent some time staring at the code, but want to make sure I didn't miss anything.)

The closest approximation I can find right now is 0 0 25-31 * fri (with day_or=False), which fails for shorter months, but changing the expression to 0 0 24-30 * fri, for example, which leads to too many fridays matching on months that end on a friday.

I've also tried using the # week-of-month syntax, but that really don't do the trick either. 0 0 * * fri#4,fri#5, Now i'm guaranteed to have too many matches.

It seems like what is necessary is L support in the wday column, but that doesn't seem to be supported. Am I missing anything?

kiorky commented 3 years ago

or an aggregate with two separate expressions with * 1,3,4,5,6,7,8,9,10,11,12 & * 2 :clown_face:

lowell80 commented 3 years ago

Hmm. But the issue isn't just Feb with 28 days, it's months with 30 vs 31 as well, right?

So something like this?

kiorky commented 3 years ago

Maybe you can use this trick: https://techsk.blogspot.com/2008/06/how-to-run-cronjob-on-last-friday-of.html

to run a cron each friday, then discrimminate if you are the last of the month directly in a script wrapper ?

lowell80 commented 3 years ago

Hmm, I may have to go the post-processing route. The tricky part is that I have to accept CRON syntax on the input, and behave the same way croniter does currently.... So wrapper class it is! 🤦

If I come up with something functional I'll post a code snippet here and maybe someone else can run with it and turn it into a real feature. I can't do a proper PR at the moment because (1) I'm short on hours, and (2) I'm sure the implementation will be hacky and potentially very inefficient in certain scenarios beyond my targeted use case.

lowell80 commented 3 years ago

Here's a first pass at an implementation in case anyone else needs something similar. I'm sure this could be modified into something that could be reusable (ideally merged back into the official library), but the current approach is a rather brute-force approach.


import calendar
from croniter import croniter as croniter_base, CroniterBadCronError
import re

class croniter(croniter_base):
    """ Wrapper around croniter that adds support for "L" in the day of week

    Syntax:  m, h, d, mon, dow, [s]

    This class supports dow like:    5L   (for the last friday of the month)

    Internally this use the '#' syntax because the last friday of the month will
    always be either the 4th or 5th friday, so the cron expression is updated
    to match both of those.  Then as a pre-output filter, a check is done
    against a calendar to determine if the propose output is in fact the last
    occurrence of that day in the month, and if so, it's returned.
    """

    def __init__(self, expr_format, *args, **kwargs):
        self.last_weekday_of_month = set()
        expr_format = self.prep_expr(expr_format)
        croniter_base.__init__(self, expr_format, *args, **kwargs)

    def prep_expr(self, expr_format):
        """ Intercept "<dow>L" entries in day-of-week and handle them specially.
        Everything else is passed along as-is.  Unpack / repack. """
        expressions = expr_format.split()

        # day of week field manipulations
        found_types = set()
        items = []
        for dow in expressions[4].split(","):
            type_, dow = self.handle_dow(dow)
            found_types.add(type_)
            items.append(dow)
        if len(found_types) > 1:
            raise CroniterBadCronError(
                "Mixing 'L' and non-'L' syntax in day of week field is not "
                "supported in the cron expression:  {}".format(expr_format))

        expressions[4] = ",".join(items)
        expr_format = " ".join(expressions)
        return expr_format

    def handle_dow(self, day_of_week):
        mo = re.match(r"^L([0-7])$", day_of_week, re.IGNORECASE)
        if mo:
            dow = int(mo.group(1)) % 7
            self.last_weekday_of_month.add(dow)
            # Last dow should always be either the 4 or 5th occurrence of that dow
            return "L", "{dow}#4,{dow}#5".format(dow=dow)
        return "other", day_of_week

    @staticmethod
    def find_day_of_last_dow(timestamp, day_of_week):
        """ Given the year/month of timestamp, determine the last day of the
        month which is the day of the week.  Calendar week starts on Sunday, to
        match cron day_of_week convention.
        """
        # How expensive is this?  Easily cache by (year, month, dow)
        day_of_week = int(day_of_week)
        cal = calendar.Calendar(6).monthdayscalendar(timestamp.year, timestamp.month)
        week = -1
        while True:
            day = cal[week][day_of_week]
            if day == 0:    # 0 means absent / different month
                week -= 1
            else:
                return day

    def _filter_output(self, timestamp):
        # type: (datetime)
        # Currently assumes ret_type=datetime; all I need
        if self.last_weekday_of_month:
            ts_dow = timestamp.isoweekday() % 7
            if ts_dow in self.last_weekday_of_month:
                last_dow = self.find_day_of_last_dow(timestamp, ts_dow)
                return timestamp.day == last_dow
            else:
                # Have LDOM, but not for current day, return anyways???
                # Q:  Can this still happen after blocking L/non-L missing in dow....
                return True
        else:
            # For all the other "normal" cron expression, no extra filter needed
            return True

    def _get_next(self, ret_type=None, is_prev=None):
        """ Basically hijack the next() mechanism, and keep finding going until
        the output filter accepts.

        This could be expensive, for example, if the cron expression fires every
        second, then this could try and fail 86400 times before the next match.
        """
        while True:
            result = croniter_base._get_next(self, ret_type, is_prev)
            if self._filter_output(result):
                return result

    # WHOOPS!  don't forget to alias our new function!  (Well there went 45 mins I'll never get back!)
    __next__ = next = _get_next

Some initial unit tests:

from croniter_wrapper import croniter

sun, mon, tue, wed, thu, fri, sat = range(7)

class TestCronIterWrapper(unittest.TestCase):

    def test_last_wdom_simple(self):
        f = croniter.find_day_of_last_dow
        self.assertEqual(f(datetime(2021, 3, 6), sun), 28)
        self.assertEqual(f(datetime(2035, 12, 31), sat), 29)
        self.assertEqual(f(datetime(2000, 1, 1), fri), 28)
        self.assertEqual(f(datetime(2014, 8, 15), mon), 25)
        self.assertEqual(f(datetime(2022, 2, 19), tue), 22)
        self.assertEqual(f(datetime(1999, 10, 10), wed), 27)
        self.assertEqual(f(datetime(2005, 7, 19), thu), 28)

    def test_last_wdom_leap_year(self):
        f = croniter.find_day_of_last_dow
        self.assertEqual(f(datetime(2000, 2, 1), tue), 29)
        self.assertEqual(f(datetime(2000, 2, 10), tue), 29) # day doesn't matter
        self.assertEqual(f(datetime(2000, 2, 1), sun), 27)
        self.assertEqual(f(datetime(2000, 2, 1), mon), 28)
        self.assertEqual(f(datetime(2000, 2, 1), wed), 23)
        self.assertEqual(f(datetime(2000, 2, 1), thu), 24)
        self.assertEqual(f(datetime(2000, 2, 1), fri), 25)
        self.assertEqual(f(datetime(2000, 2, 1), sat), 26)

    def test_wdom_issue1(self):
        f = croniter.find_day_of_last_dow
        self.assertEqual(f(datetime(1987, 1, 1), fri), 30)

    def test_croniter_last_friday(self):
        it = croniter("0 0 * * L5", datetime(1987, 1, 15), ret_type=datetime)
        items = [next(it) for i in range(12)]
        self.maxDiff = 100000
        self.assertListEqual(items, [
            datetime(1987, 1, 30),  #30),
            datetime(1987, 2, 27),
            datetime(1987, 3, 27),
            datetime(1987, 4, 24),
            datetime(1987, 5, 29),
            datetime(1987, 6, 26),
            datetime(1987, 7, 31),
            datetime(1987, 8, 28),
            datetime(1987, 9, 25),
            datetime(1987, 10, 30),
            datetime(1987, 11, 27),
            datetime(1987, 12, 25),
        ]) 
kiorky commented 3 years ago

As it is non invasive, you can even put it in in croniter module, naming it something like croniter_extra, we can add the class without any danger of regression and it will be community maintained (maybe only by you ^^)...

lowell80 commented 3 years ago

I took a stab at integrating it. Will send a PR over for review.

lowell80 commented 3 years ago

Merged into master. Pending release, but I'll go ahead and close this. For anyone following along, see release v1.0.11.