Open reibitto opened 4 years ago
I see there's retryDelay
and retryMaxCount
right now. I'm not sure if there's a different design to reconcile that with what I'm proposing.
The sealed trait I mentioned could maybe encode the concept of recoverable errors, unrecoverable errors, and maybe different retry policies (like Schedule.exponential) and so on. If we go with that kind of design, retryDelay
and retryMaxCount
kind of don't make sense anymore (it's too "global" of a setting).
This might require some more thought.
To give a better idea of the alternate design I was thinking of:
sealed trait MessageAction
object MessageAction {
case object Success extends MessageAction
case object Ignore extends MessageAction
case class Delay(delay: Duration) extends MessageAction
case class RecoverableError[E](error: E, retryPolicy: Schedule[Clock, Any, Duration]) extends MessageAction
case class UnrecoverableError[E](error: E) extends MessageAction
}
For the RecoverableError
case you calculate whether to retry or not and for how long based on ZIO Schedule and the ApproximateReceiveCount
header. That means you can do stuff like Schedule.exponential(1.second) && Schedule.recurs(10)
and put it in RecoverableError
to say "retry 10 times using exponential backoff" or anything else you can express with Schedule
.
This design feels more "ZIO" to me than my original proposal.
And maybe you could make retryPolicy
a default param, with Schedule.spaced(settings.retryDelay) && Schedule.recurs(settings.retryMaxCount)
if you want to keep those settings and not make a breaking change.
The retry stuff only exists on the Publisher right now. I understand the need for Delay
as an equivalent to Akka ChangeMessageVisibility
, but for retrying can't you just use the combinators on ZStream
?
Oops, missed the fact that retry was on ProducerSettings
.
As for the relying on retries on ZStream for the consumer, that would work mostly fine in a single consumer case (with the exception of App restarts/crashes... you'll start retrying from 0 again). The issue is mostly with multiple consumers.
If you have multiple consumers in a distributed system pulling entries from the same queue, you can't share a ZIO retry policy between them. This is why I use ApproximateReceiveCount
to drive retries and exponential backoff.
Another case for Delay(0) or Ignore: Let's say I'm doing a rolling deployment with my consumer nodes. They will be running different versions for a while. In this kind of case you may want to say, "I don't know how to handle this version of the message. Let some other consumer pick it up."
By the way, I created a separate issue in ZIO that's kind of a prerequisite for the retries I had in mind: https://github.com/zio/zio/issues/3468
I'm looking to migrate to zio-sqs, but right now as far as I can tell there are 2 main modes of operation:
autoDelete=true
: This is basically at most once semantics. If your handler dies or there is a hard crash, the message you were in the middle of handling is lost forever.autoDelete=false
: Manual mode. You have to ensure to calldeleteMessage
and so on properly yourself.I think it would be nice if there was something similar to Alpakka's
SqsAckSink
: https://doc.akka.io/docs/alpakka/current/sqs.html#updating-message-statusesBasically through the type system to ensure that you handle what to do with a message after your handler has run. This gives you at least once semantics (or if you ensure your handler is idempotent, then it's effectively exactly once semantics).
A sealed trait with the following cases would cover everything I believe:
MessageAction
Done
/Delete
Skip
/Ignore
RetryLater(visibilityTimeout)
/ChangeMessageVisibility(visibilityTimeout)
Not sure on the specific naming to use, but that's the basic idea.
Is this something that makes sense for zio-sqs?