When a task fail, as of today, we have a simple "retry" counter to allow it to be retried.
The first shortcoming in a cloud environment is we can (relatively) often lose machines, so we'd like to have more retries in case a "heartbeat timeout" happens. This could be dealt with a second retry counter for heartbeat timeouts, different from the first one. Note that this is not perfect at all, because heartbeat timeouts happen 1/ when we lose a process/machine, but also 2/ when an OOM occurs. In most cases we actually lose the process because the oom-killer (linux) kills the faulty process and python doesn't get a chance to raise an OSError we could catch. There's #239 for that fwiw.
A second shortcoming is that all errors are not the same:
some errors are typically things we want to retry forever ;
some are things we want to retry after a certain amount of time (when a rate-limit occurs, or when an activity task is not found on the worker machine) [that may be proposed in another issue, we don't have the primitives for doing that now] ;
sometimes we don't want to retry because the original task already took 13 hours and we're reasonably sure that there's a problem in the code itself ;
sometimes we may want to change timeouts on the retry task (or even edit the arguments?) ;
etc. etc.
In order to solve all this, I propose that either workflows or activity tasks provide a method/callable that can be called when an error occurs, so that we can get more control on the retry behaviour. I'm not sure if it should be at the workflow or activity task level for now, @ybastide @ampelmann your input is welcome on that.
If I stick with the workflow for now, the API could typically be something like:
class MyWorkflow(Workflow):
def run(...):
# interesting things here
def should_retry_activity(task, previous_tasks = None):
# we could also pass the future but then it would be nice to have
# access to the event directly in the future (?)
if task["state"] == "timed_out":
# ...
elif task["state"] == "failed":
if task["exception_class"] == "SocketError" and "reset by peer" in task["exception_message"]:
return True
if task["exception_class"] == "OperationalError" and "could not connect to server" in task["exception_message"]:
# find a way to delay the execution? for now retry
return True
end
end
You see the idea. A few notes:
for this to work it would be nice to have a structured field to retrieve exception class, message, details, etc. => so this wakes up #102 / #103 ; probably my fault if we never merged it, let me know @ybastide ?
it may require changes in how we handle failure: in the current system we attach a "retry" counter to each event (=> task ; bad bad bad name) when generating the history, and we consume that directly in the executor without much logic ; the new way may require that we access previous tasks with the same activity ID to take the decision, or that we manipulate a counter afterwards dynamically
the "should_retry_activity" is horrible, let's find a better name
need to decide if attached to the workflow or with_attribute() decorator
need to decide what to do about child workflows
need to decide how it interacts with the normal "retry" counter ; one possibility is that we encode the logic inside a custom object that returns the new retry counter and how the retry should happen (with a delay? with extended timeouts? ...)
After writing all this I understand there can be a looot of ideas and weird things we could think about. So maybe stay minimal in a first step.
How do you see this @ybastide @ampelmann ? Maybe discuss this in front of a whiteboard later ?
When a task fail, as of today, we have a simple "retry" counter to allow it to be retried.
The first shortcoming in a cloud environment is we can (relatively) often lose machines, so we'd like to have more retries in case a "heartbeat timeout" happens. This could be dealt with a second retry counter for heartbeat timeouts, different from the first one. Note that this is not perfect at all, because heartbeat timeouts happen 1/ when we lose a process/machine, but also 2/ when an OOM occurs. In most cases we actually lose the process because the oom-killer (linux) kills the faulty process and python doesn't get a chance to raise an OSError we could catch. There's #239 for that fwiw.
A second shortcoming is that all errors are not the same:
In order to solve all this, I propose that either workflows or activity tasks provide a method/callable that can be called when an error occurs, so that we can get more control on the retry behaviour. I'm not sure if it should be at the workflow or activity task level for now, @ybastide @ampelmann your input is welcome on that.
If I stick with the workflow for now, the API could typically be something like:
You see the idea. A few notes:
After writing all this I understand there can be a looot of ideas and weird things we could think about. So maybe stay minimal in a first step.
How do you see this @ybastide @ampelmann ? Maybe discuss this in front of a whiteboard later ?