Kopachris / seshet

Modular, dynamic Python IRC bot
BSD 3-Clause "New" or "Revised" License
1 stars 1 forks source link

Task scheduler #4

Open Kopachris opened 9 years ago

Kopachris commented 9 years ago

Introduction

Seshet will be capable of running more than one bot at a time by reading configuration files for each bot and spawning them in separate subprocess controlled by a single daemon, seshetd.py. This method of running more than one bot (required for running bots on different IRC networks) will allow more flexibility than using ircutils3.start_all(), which runs all initialized ircutils3.client.SimpleClient instances under the same asyncore loop.

Aside: It is technically possible to run the same configuration on multiple networks in the same bot instance, but would require initializing the bot with multiple connection objects and event dispatchers and re-engineering several other aspects of the bot and its database model. It's far simpler to just copy a configuration file and change the server address.

Originally, the plan was to have the daemon spawn a scheduler process in addition to the bot instances. The scheduler would communicate through a pipe or queue to the daemon when it was time to fire an event, and the daemon would communicate it to the bot. However, this can be streamlined by allowing the bots themselves to handle their own task scheduling.

asyncore.loop()

When an ircutils3.client.Client instance is "started," it calls this function. The code for this function in Python 3.4 is

def loop(timeout=30.0, use_poll=False, map=None, count=None):
    if map is None:
        map = socket_map

    if use_poll and hasattr(select, 'poll'):
        poll_fun = poll2
    else:
        poll_fun = poll

    if count is None:
        while map:
            poll_fun(timeout, map)

    else:
        while map and count > 0:
            poll_fun(timeout, map)
            count = count - 1

The original plan involved coding a replacement for this function, adding some sort of after_poll() function into the while loop to check the pipe or queue for messages from the daemon. However, it may be better to simply have the bot's loop() replacement fire scheduled tasks itself.

Because ircutils always invokes asyncore.loop() with only one argument, map, our replacement function can be whittled down to something like:

def loop(self, map):
    try:
        from asyncore import poll
    except ImportError:
        raise Exception("Couldn't find poll function. Cannot start bot.")

    while map:
        poll(timeout=30.0, map=map)
        self.after_poll()

The poll() function in asyncore iterates through a map of file descriptors and asynchat.async_chat objects and calls either asyncore.read() or asyncore.write() on each of them depending on whether they are readable, writable, or both. Those functions call the appropriate methods on the asynchat.async_chat objects to handle reading and writing and poll() returns to loop() when finished with all mapped objects. If none of the objects are ready for reading or writing before the specified timeout (default: 30 seconds), poll() returns to loop() after the timeout.

If you're not familiar with select, asyncore, and asynchat, note that this doesn't mean there must be data ready for reading or writing, just that the file descriptor is not prevented from reading or writing. In normal use, the bot should never have to wait for the duration of the timeout, as it should always be ready for reading or writing. Whether or not there is actually data ready to read or write is handled by read() and write(), not poll().

Tick event

The SeshetBot.after_poll() method should generate a "tick" event and pass it on to the event dispatcher just like any other IRC event. By default, there will be two handlers for this event: one to fire off any scheduled tasks that are due and one to read a pipe/queue for messages from the daemon (for example instructing the bot to read from the task store and update the task queue).

The tick event should be a fairly simple subclass of ircutils3.events.Event:

class TickEvent(Event):
    def __init__(self):
        self.command = 'BOT_TICK'
        self.source = None
        self.target = None
        self.params = []
        self.time = datetime.today()

As the v. 0.1 milestone specifies only a basic daemon, the second tick event handler (reading messages from the daemon) will be implemented later and a command in the core.py module will manually update the task queue for now.

The task queue

The task queue is a list of task objects sorted by the time at which they're set to fire. On each tick, the bot will iterate through the task queue, execute any tasks which are due, and create duplicates with the next fire time for any repeating tasks.

Recurrence of a task will be specified by storing the keyword arguments to dateutil.rrule.rrule() in a dictionary in the task object. The keyword arguments will be passed to rrule() when generating the next instance of a recurring task, except replacing cache with True, count with 1, and if dtstart is prior to datetime.today(), it will be replaced with None.

The task to execute itself is defined by a function reference, its positional arguments, and its keyword arguments. The function reference is a string of the form "module:[class.[class.[...]]]function". If the function can't be found using the given function reference, an error will be logged and the task will be removed from the queue (but not the task store). There is a limitation on the arguments that can be given to a task: they must be pickleable.

class Task(object):
    def __init__(self, name, func, func_args=(), func_kwargs={},
                 start=datetime.today(), count=1, rrule=None):
        self.name = name

        f = self.get_func_from_ref(func_ref)
        if callable(f):
            self.func_ref = func_ref
            self.func = f
            self.func_args = func_args
            self.func_kwargs = func_kwargs
        else:
            raise ValueError("func_ref for new Task must point to callable object")

        self.start = start

        if count == 1:
            self.next = None
        elif not rrule:
            raise ValueError("rrule must be specified for new recurring Task")
        elif not isinstance(rrule, dict):
            raise ValueError("rrule for new recurring Task must be dict of kwargs for dateutil.rrule.rrule()")
        else:
            self.count = count
            self.rrule = rrule
            self.next = self.get_next_time()

    def get_func_from_ref(self, func_ref):
        """Parse a function reference and return the object it points to."""
        ...

    def get_next_time(self):
        """For recurring tasks, return a datetime of the next time the task should run."""
        ...

    def get_next_task(self):
        """For recurring tasks, build and return a Task object for the next time the task should run."""
        ...

    def store(self, db):
        """Update or insert this task in the database."""
        ...

    def remove(self, db):
        """Remove this task from the database (does not remove from task queue)."""
        ...

    def execute(self):
        """Execute this task. Does not check datetime, remove from queue,
        or build next instance of a recurring task.
        """
        ...

The task store

All tasks created through normal means should be stored in a database table with fields for task name, function reference, pickles of function args and kwargs, task repeat count, pickle of rrule args, datetime of the task's start time, and datetime of when the task was completed. When the bot starts, it will query the database for all tasks that haven't run yet, build task objects for them, and add them to the task queue. When each task executes, the time it was completed will be updated and it will be removed from the task queue (but not the task store). If it's a recurring task, a new task object will be created as mentioned above and a new row will be added to the database with the start time for the new task.

Tasks don't necessarily need to be added to the store. The task store is available for persistence. Any command module may still manually build a task object and add it to the queue without adding it to the database. If the database returns an empty set for the query, the task scheduler will simply skip updating the database for that task, although it will still create the next task object if the task is recurring. Naturally, if a task is not stored in the database, it will not survive a restart of the bot.

Kopachris commented 9 years ago

Added SeshetBot._loop(), SeshetBot.before_poll(), and SeshetBot.after_poll() in commit 1737e31.