ahawker / crython

Lightweight task scheduler using cron expressions
Other
202 stars 22 forks source link

Need to run all jobs from a mapping dictionary #35

Open tommyyz opened 7 years ago

tommyyz commented 7 years ago

First of all this module is just what I wanted (Supporting second-wide functionality)!!! Thanks for the author! My problem is I have all config of tasks in DB and want to run all tasks one-by-one. My code:

map_from_db = {
    '*/1 * * * * * *': 'every 1 sec',
    '*/2 * * * * * *': 'every 2 sec',
}

for ex, speech in map_from_db.items():
    @crython.job(expr=ex)
    def func():
        print(speech)

crython.start()

It seems crython is only selecting random one task and start running. What should I do?

tommyyz commented 7 years ago

Fixed. It's conflicting because crython use function name as unique identity. Here's the code that working:

for ex, speech in simple_map.items():
    def wrapped_func(_):  # add wrapped function to fix lexical closures
        def _f():
            print(simple_map[ex])
        return _f
    func = wrapped_func(ex)
    func.__name__ = ex
    crython.job(expr=ex)(func)

Or the simple version:

for ex, speech in simple_map.items():
    func = (lambda _: lambda: print(simple_map[_]))(ex)
    func.__name__ = ex
    crython.job(expr=ex)(func)
ahawker commented 7 years ago

@tomzhu6066

Yeah, I thought about this one a bit. I would like to add a name parameter to the @job decorator. However, the corner I've painted myself into is how the @job decorator will pass args/kwargs to the decorated function. If @job takes a name parameter, it means that this will collide with any decorated functions that also take a name keyword argument. I have considered _ prefixing but find it ugly.

I'll think about this some more and try and come up with a clean way to do:

Ideally the situation would be something like:

jobs = [
    dict(name='check_thing_every_second', expr='*/1 * * * * * *'),
    dict(name='check_thing_every_two_seconds', expr='*/2 * * * * * *')
]

for job in jobs:
    @crython.job(name=job['name'], expr=job['expr'])
    def func():
        print('This function is run as part of two jobs; scheduled for one and two second intervals')
micimize commented 5 years ago

Why not expose a separate, non-decorator-based api for scheduling predefined functions?

pariola commented 5 years ago

@ahawker what do you think about @micimize' suggestion?

micimize commented 5 years ago

@pariola Here's a wrapper util I'm using in python 3.

import typing as t
from logging import getLogger as get_logger

import crython

log = get_logger(__name__)

def schedule_job(job: t.Callable[..., t.Any], schedule: t.List[str]):
    """Schedule a given ``job`` with a list of crython compatible crontab configs

    Useful for deploying periodic jobs
    """
    log.info(f"running {job.__name__} with the schedules {schedule}")
    for index, expr in enumerate(schedule):

        def wrapped_job(*args, **kwarg):
            "wrap and rename the job to prevent name collisions"
            try:
                return job(*args, **kwarg)
            except Exception as e:
                log.exception(f"Exception thrown during job {job.__name__}")
                pass

        wrapped_job.__name__ = "%s_%i" % (job.__name__, index)
        crython.job(expr=expr)(wrapped_job)

    crython.start()
    crython.join()

# usage
schedule_job(my_job, ['@reboot', '*/5 * * * * * *'])

Tangentially, as an OS maintainer, I'd advise against phrasing requests the way you just did. To me, it reads like a command, and comes off as entitled. Prefer phrasing like "What do you think of this suggestion?", "Do you have time to implement this".

pariola commented 5 years ago

@micimize thank you for that, I would make adjustments

Shashank22082002 commented 3 months ago

Hi, I have a config file where I have the cron time expression and the command (mostly a running a python file or curl) that i need to execute at the given cron time..

I was using exec to dynamically create jobs inside the reboot job, like this

def execute_command(command):
    process = subprocess.run(command, shell=True)
    return process.returncode

@crython.job(expr='@reboot')
def init():
    jobs = get_jobs()
    # Iterate through the array and create cron jobs dynamically
    print(f"[{time.ctime()}] Starting the job scheduler, Creating Jobs...")
    for job in jobs:
        function_name = re.sub(r'\W+', '_', job['name'].lower()) + "_job"
        print(f"[{time.ctime()}] Creating job: {job['name']} with cron expression: {job['cron_expr']}")
        # Create the function dynamically
        exec(
            f"@crython.job(expr='{job['cron_expr']}')\n"
            f"def {function_name}():\n"
            f"    status = execute_command('{job['command']}')\n"
            f"    if status == 0:\n"
            f"        print(f'[{time.ctime()}] Job {job['name']} executed successfully')\n"
            f"    else:\n"
            f"        print(f'[{time.ctime()}] Job {job['name']} failed with status: ' + str(status))\n"
        )

if __name__ == '__main__':
    crython.start()
    crython.join()

get_jobs just fetches all jobs from the config file. Can someone confirm if this method will work? Is there a better way to achieve this? (Running Jobs by reading from a storage)