PolicyStat / jobtastic

Make your user-responsive long-running Celery jobs totally awesomer.
http://policystat.github.com/jobtastic/
MIT License
644 stars 61 forks source link

Decorator all the things! #11

Open winhamwr opened 12 years ago

winhamwr commented 12 years ago

Instead of requiring subclassing of JobtasticTask, provide a jobtastictask decorator that uses the decorated method as calculate_result. This wouldn't allow things like overriding progress calculation, but for a big majority of tasks, you probably don't need that anyway. The configuration member variables can then be set as keyword arguments on the decorator.

HT: zphds

winhamwr commented 11 years ago

So some possible places to look for implementing this:

@jobtastic_task via shared_task

celery.app.shared_task shows a way to register a task that will be available to all celery app instances. This gives us a way to avoid doing something nasty like monkeypatching the Celery object to add a celery.jobtastic_task or some such.

Steal relevant stuff from _task_from_fun

celery.app.base:Celery._task_from_fun has all of the magic needed to create your task object from a function.

winhamwr commented 11 years ago

Back of the envelope possible solution:

def _jobtastic_task_from_fun(app, fun, **options):
    if 'base' in options:
        raise Exception("'base' can't be used on a JobtasticTask")

    T = type(fun.__name__, (JobtasticTask, ), dict({
        'app': app,
        'accept_magic_kwargs': False,
        'calculate_result': staticmethod(fun),
        '__doc__': fun.__doc__,
        '__module__': fun.__module__}, **options))()
    task = app._tasks[T.name]  # return global instance.
    task.bind(self)
    return task

def jobtastic_task(*args, **kwargs):

    def create_jobtastic_task(**options):

        def __inner(fun):
            name = options.get('name')
            _shared_task(lambda app: _jobtastic_task_from_fun(app, fun, **options))

            for app in _get_active_apps():
                if app.finalized:
                    with app._finalize_mutex:
                        _jobtastic_task_from_fun(app, fun, **options)

            def task_by_cons():
                app = current_app()
                return app.tasks[
                    name or gen_task_name(app, fun.__name__, fun.__module__)
                ]
            return Proxy(task_by_cons)
        return __inner

    if len(args) == 1 and isinstance(args[0], Callable):
        return create_jobtastic_task(**kwargs)(args[0])
    return create_jobtastic_task(*args, **kwargs)
brk0v commented 11 years ago

Hi, I'm investigating how to do the same thing. Could you please give a full example of using this kind of pattern. Thanks!

winhamwr commented 11 years ago

Hi Viacheslav,

Unfortunately, that back of the envelope example is the closest I've come. If I had a full example, it would already be merged in :P If you give this a try and you run in to specific problems, I'd be happy to help with some debugging, but I haven't yet been able to prioritize building this out completely. Decorators would be awesome, though.

-Wes