Open rhunwicks opened 10 years ago
Hello!
If I was solving this problem, I think I would do something like this:
viewRecalcWaitTask
(or some such) viewRecalcWaitTask
gets called with a 120 second countdown (as you suggested) and all it does when it runs is check that known cache key. If the timestamp is more than 120 seconds old, it just calls the viewRecalcTask
that does the recalculation and then immediately exits. If the timestamp is less than 120 seconds old, it just exists without calling the recalculation task.viewRecalcTask
, record a timestamp as soon as it starts running. Then check the known cache value at the very end of the run. If the cache timestamp is later than the task's starting timestamp, then it knows that it needs to run again because an update happened while it was running. Then you can just start another viewRecalcWaitTask
with a 120 second countdown without updating the known cache value.I think that will result in the behavior you're looking for. Thoughts?
-Wes
And if you could think of a good way to include this kind of pattern in Jobtastic with an understandable API, that would be super-cool. Maybe a generic groupedWaitTask
and a helper method to get/set the cache with the timestamp?
This is my first cut...
apply_async
stores the time the Task should wait until in the cache and then calls super
.run
checks if the wait time has been reached:
None
so the Task is treated as SUCCESSFUL by CeleryDoes this look sensible to you?
class GroupedWaitTask(JobtasticTask):
"""
An extension of ``JobtasticTask`` that waits a specified number of seconds
since the last time it was submitted before it runs.
This supports tasks that must be run when a set of updates has been complete
but you don't know when that will be.
One example is refreshing a summary table (perhaps implemented as a
materialized view): the refresh must happen in a timely manner after the
user updates the underlying records, but will be out of date as soon as
another update is made. Therefore, we want to wait for a short time after
each update is made to see if there are any more updates, and then run the
refresh task after the "wait period" expires.
The following class members are required:
* ``wait_seconds`` The number of seconds since the last time this task was
submitted that must elapse before the task can run.
"""
abstract = True
ignore_result = True
@classmethod
def apply_async(self, args, kwargs, **options):
"""
Store the timestamp of the most recent submission for this task
"""
self._validate_required_class_vars()
cache_key = self._get_cache_key(**kwargs)
# Store expiry time for the wait period
if 'eta' in options:
wait = options['eta']
elif 'countdown' in options:
wait = datetime.now(pytz.utc) + timedelta(seconds=options['countdown'])
else:
wait = datetime.now(pytz.utc) + timedelta(seconds=self.wait_seconds)
logging.info("Setting %s to wait until %s", self.name, wait)
cache.set('wait:%s' % cache_key, wait)
return super(GroupedWaitTask, self).apply_async(args, kwargs, **options)
def run(self, *args, **kwargs):
cache_key = self._get_cache_key(**kwargs)
if get_task_logger:
self.logger = get_task_logger(self.__class__.__name__)
else:
# Celery 2.X fallback
self.logger = self.get_logger(**kwargs)
# If we haven't reached the end of the waiting period then schedule a
# new task for then and exit
wait = cache.get('wait:%s' % cache_key)
if wait >= datetime.now(pytz.utc) and not self.request.is_eager:
self.logger.info("Deferring %s until %s", self.__class__.__name__, wait)
# Remove the existing herd protection because we want to submit a new task
cache.delete('herd:%s' % cache_key)
self.apply_async(args, kwargs, eta=wait)
return None
# We have reached the end of the wait period, so calculate the result
result = super(GroupedWaitTask, self).run(*args, **kwargs)
# If a task was submitted after we started calculating the result, then
# submit a new job now
new_wait = cache.get('wait:%s' % cache_key)
if new_wait > wait and not self.request.is_eager:
self.logger.info("Resubmitting %s for %s", self.__class__.__name__, new_wait)
self.apply_async(args, kwargs, eta=new_wait)
return result
@classmethod
def _validate_required_class_vars(self):
"""
Ensure that this subclass has defined all of the required class
variables.
"""
required_members = (
'wait_seconds',
)
for required_member in required_members:
if not hasattr(self, required_member):
raise Exception(
"GroupedWaitTask's must define a %s" % required_member)
super(GroupedWaitTask, self)._validate_required_class_vars()
This looks great! It's probably worth some tests with mocking to ensure we're hitting the right paths, but from reading through, I think this will do what we want. It's also better than my proposed API. This being effectively a wrapper around the task that actually does the work is quite slick.
The only think is missing here (besides being a pull request) is documentation for the README plus (ideally) at least some kind of tests for regression purposes. Supporting multiple versions of Celery without those is pretty tough.
This is very cool!
Thanks -Wes
I'm happy to turn it into a PR and do docs. I'd like to do tests too, but I'm not sure where to start. I tried writing tests using my normal (django-based) approach, but it doesn't work in the absence of a working Celery server, and my attempts to get one to run under the control of the Django test runner have been unsuccessful so far. If you could provide an example, I'll add other test cases to it.
I want to use Jobtastic to manage the refresh of a PostgreSQL materialized view when the user updates the underlying tables. The user performs many updates in succession and refreshing the view takes about 5 minutes. Therefore, I want to:
countdown=120
so that I wait for 2 minutes after the last update to see if there are any more updates before executing the taskDoes that use case fit with the intention of Jobtastic?
I can see that at the moment it doesn't distinguish between PROGRESS and PENDING for herd avoidance and I would need that - so that the herd avoidance is on PENDING only.
Part of that change would also be to reset the countdown time on the task when exiting through the herd avoidance path.
My understanding of the
acquire_lock
code at the moment is that it prevents duplicate Tasks being submitted - I think we might need a separate lock along the lines of http://celery.readthedocs.org/en/latest/tutorials/task-cookbook.html#ensuring-a-task-is-only-executed-one-at-a-time to make sure that the PENDING task isn't executed until the PROGRESS one finishes.I'm happy to work on PR for any required changes if you point me in the right direction.