antirez / disque

Disque is a distributed message broker
BSD 3-Clause "New" or "Revised" License
8.01k stars 537 forks source link

WORKING command: a way to delay next deliveries of the same message. #7

Closed josiahcarlson closed 9 years ago

josiahcarlson commented 9 years ago

When submitting a job with ADDJOB, you can't necessarily predict the total amount of time that a job will take in order to provide a proper RETRY time. In the context of locks that time out, there is the concept of "refreshing" the lock to ensure it is still held by the owner. I think it would make sense to add a command whose purpose is to reset the RETRY timer, in effect allowing a worker to say "I'm not done, but I'm still working on this, I may still fail, but don't retry it yet."

Something like WORKING jobid1 ... or given that ASYNC is not a valid jobid, WORKING jobid1 ... [ASYNC], depending on whether async retry resets make sense.

Documentation-wise: Tells Disque that the provided job ids are still being worked on, and the retry timer should be reset as though those jobs had just been fetched by GETJOB.

antirez commented 9 years ago

Hello Josiah, thanks for the suggestion. This feature looks a bit dangerous to me, since it is perfectly possible that a failure mode for the worker is to never finish the processing, but continuously acknowledging that it is still on it for days, until the TTL is reached. This would break the at most once semantics basically, and I see this worker bug as very real world. What do you think?

josiahcarlson commented 9 years ago

In the context of jobs to be executed, I'm going to say that there are 2 major classes of jobs (huge over-generalization, but it will serve for this example): 1) jobs that loop and process a single type of data, 2) jobs that perform a sequence of different operations, each of which could fail individually.

In code, they usually look something like this:


def cycle_processing(...):
    for thing in things_to_process:
        # process thing

def multi_step(...):
    # do step one, may take a while, may fail
    # do step two, may take a while, may fail
    # do step three, may take a while, may fail
    # do step ..., may take a while, may fail

When you are locking a specific resource (aka job) for the duration of the job execution, it is very common to acquire the lock at the start of the function (get the job), and either periodically within the loop or between major steps, refresh the lock (tell the queue system you are still working on the job). In these two examples of the most common job processing operations that are done, an infinite loop is only possible if you mess up your looping construct.

And if you mess up your looping construct to cause the job to never finish (by exceeding the TTL), with this new command, you've actually saved other job processors from getting into infinite loops by preventing them from picking up this bad job.

So, not only does this feature solve real problems with ensuring the same job doesn't get picked up multiple times in at-least-once mode (by preventing retries), it also prevents an infinite looping job from cascading through your workiers.

antirez commented 9 years ago

I still don't agree. IMHO what you are really seeking here is a mutual exclusion primitive, and to provide such a primitive in the way you advice, is not a safe way to do so. One should resort to a replicated state machine, or should use something like the "Redlock" algorithm if we can't afford the latency of a replicated state machine that will have to fsync on disk at every operation (Redlock can provide the same guarantee assuming the server implements delayed restarts).

Now the question is if one should or not have a mutual exclusion mechanism in Disque, or if the job of Disque is just to deliver messages and jobs processing needing mutual exclusion should resort to a lock server implemented by other means. I still don't know the answer to this question, hopefully it will be clear to me in the future, but I'm pretty sure that from my point of view, mutual exclusion is a strong property that should not be provided in the way to suggest in this PR (which is a weak way, you have no guarantees at all under failures that the same job will not be picked multiple times).

About the failure mode I discussed in my previous message, I think that instead there is a great window to write bad workers that will expose that class of bugs I mentioned. Every worker without an explicit time check inside the loop would be potentially albe to trigger this, for example inside a try/catch block, where actually it is not the operation that is failing, but some library because of some mismatch dependency or mispelled domain name or alike. So the worker may fail into a loop, if no explicit timeout is provided, for reasons external to the actual ability to perform operations 1, 2, 3, 4, but just for software bugs or for networking problems with a specific worker.

So basically I think that I agree with you that there are a number of problmes where the mutual exclusion processing is very useful in jobs processing, but I think that this should be addressed in a direct way using a distributed lock of some kind. If this lock primitive should be part of Disque or not it is still not clear to me.

Thanks.

josiahcarlson commented 9 years ago

I have never seen the failure mode you describe come up in practice. Every task that I have ever seen or written looks like some combination of the two cases I described in my earlier reply. And when using the "I'm still working on this" mechanism, you don't use a timer or timing at all, you explicitly refresh the flag/lock/whatever after every X steps.

The "mutual exclusion" that is going on is, "this job is still being processed, don't retry it yet". The big problem that this feature addresses directly is, "Oops, my 25 second job turned into a 65 second job due to [one of a dozen reasons], and now it ran 15 times in the 15 minute TTL I gave it instead of once."

If/when Disque gets used by more people, don't be surprised if this issue/feature request comes up again.

justincase commented 9 years ago

Message visibility is widely used within AWS SQS. For example, if a task can take anywhere from 15 minutes to an hour, i have to set the retry value at 1 hour — potentially wasting 45 minutes where it could have been re-queued.

SQS limits the visibility extension to 12 hours which effectively cancels out any buggy workers stuck in a loop although I don't feel that argument holds too much weight.

fritzy commented 9 years ago

I agree. This should be a feature with a configured and default cap.

josiahcarlson commented 9 years ago

I think it might be worth a bit of clarification here for other readers, and maybe for Salvatore.

In the case of AWS SQS, when you fetch a message, it gets a default RETRY[1]. This RETRY[1] time can be adjusted before getting the message with the SetQueueAttributes call on the queue, or can be set afterwards via the ChangeMessageVisibility / ChangeMessageVisibilityBatch calls. In total, messages/jobs can be out for up to 12 hours before they are forced back into the queue for an actual retry or failure.

With Disque, you can only set the RETRY timeout as the item is being inserted. My proposal is to re-use the RETRY timeout to serve as the timeout for offering a simplified SQS-style "ChangeMessageVisibility" call without needing to pass any additional or replacement RETRY times. I didn't bring up the possibility for a 12 hour limit like AWS SQS as I haven't found it necessary in practice.

[1] I'm using Disque's vocabulariy of RETRY, but it's called a "visibility timeout" in SQS.

Long-ish story short: effectively identical functionality is already available in other queue systems. The need is real, and there are ways of at least partially mitigating the looping concern.

justincase commented 9 years ago

Thanks Josiah,

A RETRY-timer reset as you described would suffice for my workloads. Perhaps we can reopen this issue for additional discussion?

antirez commented 9 years ago

Reopening for additional discussions. Limiting the ability to postpone the message re-delivery up to, for example, 50% the TTL of the message can be a valid way to get such a feature but still without the risk of postponing forever.

antirez commented 9 years ago

So... as specified in other different issues it is pretty crucial IMHO that we retain jobs immutability in Disque like it is, is a huge conceptual simplification. Then RETRY could work like that:

  1. It would reset the re-queue timer for all the owners of the message by broadcasting a QUEUED message (as it happens already). This will force nodes to update the retry time.
  2. I think the command should just be asynchronous, since anyway if there is a partition would would we do if we are half-way? Not sure it's worth to make the API more complex to wait to reach all the nodes that may have a copy.
  3. If the job reached 50% of its expire time, the command should fail. So for example if the job has a TTL of 24 hours and a retry of 10 minutes, you can retry up to 12 hours but then the job is re-delivered and there is no way for workers to complete the job in more time than the retry time.

Makes sense like this?

Related: we are investigating in another issue the ability to provide a negative ACK so that failing clients can ask the system to deliver again the message ASAP instead of waiting for the natural re delivery.

Edit: so the important detail here is, we will not be able to actually change the retry time, just to reset it. But I believe Josiah initially conceived the feature like that for similar goals, so that jobs can remain immutable items and there is no merge of conflicting jobs and so forth.

Also, what happens during partitions? That nodes that can't be received may not update the retry time and may re-deliver. Pretty unavoidable if you ask me since this is a system that should be able to provide the messages in the minority partition.

carlhoerberg commented 9 years ago

For perspective: In RabbitMQ/AMQP there's no retry timer, instead the consumer receives a message, and it has to either ack it or reject it. The consumer can hold on to the message as long as it wants to, but if it disconnects from the broker the message goes back into the queue (to the original position, ie. not last) and can be redelivered to another consumer. The good part is that you don't have to worry about to configure a good retry timer, the bad part is if it takes a long time before the connection is considered closed and if the consumers looses the connection while processing the message but is able to restore it, it has then to consume the same message again. The problem is then how to identify when a consumer connections is down, short TCP keepalives (the default linux 2h is too long) or as protocol level "heartbeat" (AMQP has that, I don't know why they don't just use TCP keepalives, lacking platform support?) are two ways. Maybe it's considered and rejected already, but couldn't see it from the discussion.

On Wednesday 6 May 2015 at 06:00, Salvatore Sanfilippo wrote:

So... as specified in other different issues it is pretty crucial IMHO that we retain jobs immutability in Disque like it is, is a huge conceptual simplification. Then RETRY could work like that: It would reset the re-queue timer for all the owners of the message by broadcasting a QUEUED message (as it happens already). This will force nodes to update the retry time. I think the command should just be asynchronous, since anyway if there is a partition would would we do if we are half-way? Not sure it's worth to make the API more complex to wait to reach all the nodes that may have a copy. If the job reached 50% of its expire time, the command should fail. So for example if the job has a TTL of 24 hours and a retry of 10 minutes, you can retry up to 12 hours but then the job is re-delivered and there is no way for workers to complete the job in more time than the retry time.

Makes sense like this? Related: we are investigating in another issue the ability to provide a negative ACK so that failing clients can ask the system to deliver again the message ASAP instead of waiting for the natural re delivery.

— Reply to this email directly or view it on GitHub (https://github.com/antirez/disque/issues/7#issuecomment-99235312).

antirez commented 9 years ago

@carlhoerberg your comment is pretty informative for me, thanks. I'm pretty sure I don't like the RabbitMQ way, since it seems like mixing a distributed system behavior with a low level detail (socket is connected or not). No surprise that this can break in different ways...

So far to actively refresh the job retry time with RETRY looks pretty much our best bet, however I think we may need a few refinements.

  1. RETRY return value should be how many seconds likely we have. So that the worker knows that it is a good idea to send a new RETRY in maybe 50% of this time or alike.
  2. The document should suggest to RETRY the first time ASAP, before starting to process the job, if the worker plans to use RETRY later, in order to fetch this time.

However as an alternative to "2" I think it is a good idea of GETJOB is able to ask for additional information about the job, like approximated number of deliveries so far, number of negative acknowledgements, expire time, and retry value.

So instead of RETRY ASAP there is the alternative to get the retry time with GETJOB. However while we can have the retry as the many fields you can ask to GETJOB to also report as additional info, I believe the simpler protocol is what we should advice:

id = GETJOB
retry_time = RETRY $id
WHILE NOT_FINISHED()
    IF 50% of $retry_time already elapsed DO RETRY $id
    DO_SOME_WORK()
END

Makes sense? I think this is pretty much the initial @josiahcarlson proposal, if not for the refinement or returning the retry time as return value of RETRY, or an error if the job is not found.

carlhoerberg commented 9 years ago

Refreshing the RETRY parameter is not much unlike having a heartbeat, except that the consumer code have to do it explicitly, instead of the library doing it in separate thread in the background. Refreshing the RETRY now means that the consumer code have to have knowledge about the queueing backend and be modified and coupled to the Disque library instead of being backend agnostic. Say you have a worker written in Ruby, one of the jobs is to encode a video or something, it calls out to a library in C, now to refresh the RETRY parameter the ruby code has to start a separete thread to periodically call the disque library and refresh, something many are unfamiliar with. Isn't it better if the library takes care of that for all users so not everyone have to implement it them selves? On May 6, 2015 4:32 PM, "Salvatore Sanfilippo" notifications@github.com wrote:

@carlhoerberg https://github.com/carlhoerberg your comment is pretty informative for me, thanks. I'm pretty sure I don't like the RabbitMQ way, since it seems like mixing a distributed system behavior with a low level detail (socket is connected or not). No surprise that this can break in different ways...

So far to actively refresh the job retry time with RETRY looks pretty much our best bet, however I think we may need a few refinements.

  1. RETRY return value should be how many seconds likely we have. So that the worker knows that it is a good idea to send a new RETRY in maybe 50% of this time or alike.
  2. The document should suggest to RETRY the first time ASAP, before starting to process the job, if the worker plans to use RETRY later, in order to fetch this time.

However as an alternative to "2" I think it is a good idea of GETJOB is able to ask for additional information about the job, like approximated number of deliveries so far, number of negative acknowledgements, expire time, and retry value.

So instead of RETRY ASAP there is the alternative to get the retry time with GETJOB. However while we can have the retry as the many fields you can ask to GETJOB to also report as additional info, I believe the simpler protocol is what we should advice:

id = GETJOB RETRY $id WHILE NOT_FINISHED() IF 50% of time already elapsed DO RETRY DO_SOME_WORK() END

Makes sense?

— Reply to this email directly or view it on GitHub https://github.com/antirez/disque/issues/7#issuecomment-99379692.

josiahcarlson commented 9 years ago

@antirez Your variant of my proposal is close enough to what I actually want that I can make it work, though I have a few comments/suggestions if you are open.

  1. Don't call it RETRY. RETRY as a command name implies that this job needs to be retried, not that you are resetting the retry timer. I proposed WORKING because it sort-of implies "still working on this job", but there are probably better names than either RETRY or WORKING.
  2. I have use-cases in my current platform where both async and sync with a timeout/error would be useful. Maybe a command syntax of: WORKING [ASYNC|ttl for replication in ms] jobid1 ... makes sense, if both async and synchronous replication options are desired for this functionality (I think they are, much like ADDJOB's async and block with timeout).
  3. The "half of TTL" limitation feels arbitrary, much like the AWS SQS 12 hour limitation. That said, I'd prefer to have the SQS style min(12 hours, ttl) than ttl/2 if I could pick, as it gives me what I actually want for short 5-30 minute TTL jobs without having to double my TTL.

Additional notes on 3 above: Can I double my TTLs for all short TTL jobs? Sure. But the most likely scenario (for the coming weeks/months) is that I'll be transitioning from another job/task system to Disque. And in those situations, needing to double my TTLs for the sake of Disque is just one more (error-prone) thing to do to the 50+ tasks I've already got running in my current architecture.

@carlhoerberg Almost no one uses the RabbotMQ, AMQP, 0MQ, or any of the other task queue protocols/APIs directly. Instead, almost everyone uses a client library over the top of those bare protocols/APIs to wrap them in an interface that is appropriate to the language/framework/etc. that they are using. Whether or not your Disque-backed queue client library offers automated RETRY timer resets in the background with threads is really up to the author of your library (though this is something that wouldn't be impossible to add outside the library, depending on the language/runtime).

antirez commented 9 years ago
  1. WORKING looks good indeed. Accepted unless something better is proposed.
  2. Not sure what the exact synchronous semantics would be. Consider that it is possible for the node you contact, not to have the full list of jobs having a copy (the knowledge every node has of the other owners of a job is a best effort thing, and Disque never rely on the list to be complete). I've the feeling that to report a failure when not all the nodes can be contacted is very rarely useful...
  3. The idea of the half TTL is that it's on the contrary not arbitrary since is a percentage of the TTL time.

More on "3". Imagine you have a job with TTL 24h and retry time of 5 minutes, since it's just a task related to sending an email. There is an SMTP failure for hours, yet the email will eventually be delivered. However if there is just some slowness, as soon as the message gets delivered, there is plenty of time for the worker to use WORKING in order to work more at it in some way. However if after 12h the message is still not delivered it is conceivable that it is possible the problem is in the worker code that is processing the message, so I see it as a pretty sane option to deliver the message to some other worker after this time. That's the idea. A fixed time of 12h instead is pretty arbitrary IMHO, since if the job TTL is 6 hours it means that a bug may result into no delivery.

@carlhoerberg I've the felling that most workers doing very slow operations like video encoding will rarely be in a position of doing the work in a blocking way, likely they'll call some other external tool and wait for the process to finish. However when this is not the case a second thread could be needed indeed. I believe it's up to the library client to provide some abstraction or not. However on the other hand, many workloads could be in a loop where it is pretty easy to just send in the context of the active commenction the WORKING command if we are going to be late.

From the point of view of heartbeat, sure, WORKING is a form of heartbeat, but it is an higher level one, so the distributed system is only specified in terms of high level messages, and not of tcp keepalive or TCP connections or layer-3 apparatus closing them, so I believe WORKING while conceptually similar to other systems is at the right level of abstraction compared to other means to reach the same goal.

Thank you both for your comments!

josiahcarlson commented 9 years ago

Awesome on the rename, and always ASYNC makes sense given the platform.

On the TTL side of things, I can see your perspective and why you think it makes sense to build it like that. I disagree with the details of the limitation because that's not the particular failure mode that makes sense for the work that I need to do, and really, either choice is arbitrary.

In trying to figure out what might be closer to the "right" answer (if there is one), I looked through 8 different message/job queue services/protocol specifications referenced from the "Message Queues" Wikipedia entry. Sadly, documentation for most queue services/protocols is awful, and I was only able to learn what IronMQ offers: allow the retry timeout to be extended until the message expiration. The STOMP and AMQP protocols seem to suggest that timeouts can be extended to message expiration, but I wasn't able to confirm this when looking at documentation for implementations of either protocol.

If you are open to an optional flag/field to specify behavior, I have the sense that offering users the option to set a maximum retry delay during ADDJOB would offer the most flexibility, and having it default to TTL/2 is reasonable. If you're not sold on this particular piece, maybe consider the optional argument as a possible future enhancement (depending on user feedback).

justincase commented 9 years ago

Bugs in worker code come in many forms. I don't think it's an issue for disque-server to address. It's likely that, with todays automated deployments, all workers are running the same codebase. Having another worker reprocess the job would then result in the same outcome.

I'm personally fine with either of the proposed options although my preference is to be able to extend the timeout until the message expiration.

antirez commented 9 years ago

@justincase this is totally up to the design tastes, so I understand your reasoning, however I think at it differently. My point of view is that:

  1. Yes, IMHO it's about Disque to avoid a single worker to monopolize a job till expire, because this can be seen as part of the delivery guarantee semantics in some way (yep totally debatable).
  2. Even when all the workers have the same copy of the job you can have:
    • Local failures (disk or network) making a worker in permanent inability to make progresses.
    • Transient failures of an external system, like DNS, making a given run of the worker to remain in permanent error.

That's why I believe it is so important to put a limit on the amount of time a worker can use for a job.

justincase commented 9 years ago

@antirez Ideally under those circumstance the worker stops issuing the WORKING command and sends a NACK but I see your point. I'm fine with TTL/2 either way.

antirez commented 9 years ago

Thanks for the feedbacks @justincase, looks like we have a design for the feature that looks more or less reasonable in its main parts to everybody involved. Note that we can always change the TTL/2 thing since it's part of the behavior but not of the API, so with some care, we should be able to make this configurable or alike if needed.

I'll write the code in the next days :-) AFAIK it is a couple line of code if I reuse the existing logic. News soon.

antirez commented 9 years ago

Working on that (5 min at a time since it's saturday), should be ready before tonight my timezone.

antirez commented 9 years ago

Hi again, the WORKING command is implemented in the working branch as per specification. However currently no time limit is implemented (TTL/2). I'll add this support later. Please if you can check that the implementation conforms with our chats. Thanks.

justincase commented 9 years ago

@antirez Thanks! Great work. It's working as expected (imo, of course).

djanowski commented 9 years ago

What about TOUCH?

(It came to my mind, and then confirmed that Beanstalkd uses the same.)

antirez commented 9 years ago

TOUCH is definitely more universal to understand, has a strong Unix counterpart. WORKING is more near to what is actually going on in a message queue. I'm not sure what I like more.

djanowski commented 9 years ago

Most importantly I don't like that WORKING is an adjective and looks like a query command. I think I like TOUCH, but if there's no quorum for it, then I'd try to find some other verb, like REFRESH or something.

antirez commented 9 years ago

The adjective is not bad IMHO per se. I don't think it must be necessarily VERB OBJECT, since in this case we are not doing something explicit with the job, it's like the client is saying "I'm still working with this job", and the server reacts to this information by best-effort postponing the next requeue. But here you got a RESTFUL hater... so well I dunno :-) Let's think a bit more about it before merging.

antirez commented 9 years ago

Btw in case we'll favor a verb, I think POSTPONE can be a more explicit pick compared to TOUCH and REFRESH if we really want to have commands clearly stating the operation which is happening as a result.

antirez commented 9 years ago

the WORKING command was just merged with the TTL limit implemented. We can still change this in the future if we are not happy while in alpha stage. Thanks!

josiahcarlson commented 9 years ago

Thank you Salvatore :)

antirez commented 9 years ago

Thanks @josiahcarlson this was a collaborative design effort gone well :-)