typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.37k stars 600 forks source link

Improve `retry` #904

Closed pchlupacek closed 7 years ago

pchlupacek commented 7 years ago

Improve retry combinator so it may be more generic and reusable.

Follows: #900

pchlupacek commented 7 years ago

As per discusssion in #900 I would suggest following combinators for retry:


// on Stream

def retry(f: retry.Retry): Stream[F, Attempt[A]] 

// on Stream singleton

def retryEval(fa: F[A])(f: retry.Retry): Stream[F, Attempt[A]]

// then I would suggest  introduce top-level retry object 
object retry {
  type Retry = (Throwable, Int) => F[Boolean]

 /** retry infinitelly, with supplied delay **/
  def infinite[F[_]](delay: FiniteDuration)(implicit S: Scheduler): Retry = ???

  /** retry infinitelly with supplied delay, that is incremented at every failure by `increment` **/
  def incremental[F[_]](
    start: FiniteDuration
    , increment: FiniteDuration
    , maxAttempts: Int
  )(implicit S: Scheduler) : Retry = ???

}
SystemFw commented 7 years ago

@pchlupacek Could you also have a look at attempts?

As for your proposed signature, I'm not sure how it would work:

pchlupacek commented 7 years ago

@SystemFw I think attempts is special case of retry.

as for the implementation, I was thinking the incremental to be something like


def incremental[F[_]](
    start: FiniteDuration
    , increment: FiniteDuration
    , maxAttempts: Int
  )(implicit S:Scheduler, F: Async[F], ec: ExecutionContext) : Retry =  { (rsn, attempts) => 
  if (attempts >= maxAttempts) F.pure(false)
  else {
   val delay = start + (increment * attempts)
   S.sleep(delay).run as true
  }
}

def retry(f: retry.Retry): Stream[F, Attempt[A]]  = {
  def go(attempts: Int): Stream[F, Attempt[A] = {
    self.map(Right()).onError { err => 
      Stream.emit(Left(err)) ++ 
      Stream.eval(f(err,attempts) flatMap {
         case false => Stream.empty
         case true => go(attempts + 1)
      }
   }
  }

  go(1)
}
SystemFw commented 7 years ago

I'm not sure you can implement exponential backoff, and in general anything where the delays are stateful, with your approach. Anyway, I think both approaches (starting from an infinite Stream and manipulate it with combinators, vs starting with a function and building the Stream) are largely equivalent, it mostly comes down to preference :)

pchlupacek commented 7 years ago

@SystemFw give me please an example what cannot be implemented with proposed signature and can be implemented with current signature.

pchlupacek commented 7 years ago

For example with current signature I cannot implement situation, where I need to consult external signal (i.e. network state) to decide whether next retry shall be performed or not.

pchlupacek commented 7 years ago

Exponential backoff can be implemented if you have pure exponential function that computes exponential backoff, which I am sure you have, as current signature is pure only.

SystemFw commented 7 years ago

So you can't implement stateful delays, whereas I think attempts can do most of what you want to do with your function (the only exception being a different delay depending on the Throwable, which I don't think is very useful).

However, as I said, I will implement anything we can find consensus on :)

SystemFw commented 7 years ago

For example with current signature I cannot implement situation, where I need to consult external signal (i.e. network state) to decide whether next retry shall be performed or not.

Why not? Create a Stream[F, FiniteDuration] that only emits if the network state is the way you want it, and pass it to attempts

SystemFw commented 7 years ago

Otoh, how do you implement something like "stop when the total sum of delays reaches 10 minutes" ?

pchlupacek commented 7 years ago

@SystemFw are we talking about retry currently implemented? How you consult Stream[F, FiniteDuration] if both of the functions there are pure?

pchlupacek commented 7 years ago

@SystemFw stop when total sum of delay == 10 minutes will require external ref where you keep that state. this is common pattern in fs2._ that is used on many places. But thats something I don't see possible at all with current retry (because you have only pure combinators in it). Is ok though keep attempts like they are today, I have no problem with that, as said before, I think it is a special case of retry.

pchlupacek commented 7 years ago

@SystemFw I was thinking it once more, and perhaps we could even express state nicely in the retry combinator if we would change it to


def retry(f: F[retry.Retry]): Stream[F, Attempt[A]] 

that way you can acquire the reference in retry and the use it to perfrom whatever state computation you desire inside the retry combinator itself.

mpilquist commented 7 years ago

Minor code organization note: I would really like to avoid a top-level retry object if possible. I'd also like to avoid the type alias.

SystemFw commented 7 years ago

@pchlupacek As I said, attempts is my equivalent of your retry. My retry is the equivalent of your incremental. What you ask can be done by having a Stream[F, FiniteDuration] which reads the delay from the network, and stops emitting when it wants to stop. Pass that to attempts, and you have the behaviour you want. You can always implement statefulness with F + Ref, but why? A Stream[F, FiniteDuration] is already stateful, and allows you to do things much more nicely.

EDIT: I also suspect having F[retry.Retry] means you're on your own wrt to resource safety

pchlupacek commented 7 years ago

@SystemFw ok, I see know where you are going.

So let's lift restriction on attempts to be single stream. To do so we will need to implement it via pull, consulting durations only when the pull fails.

So to leave suggested attempts signature we may try this:


// when s2 terminates, then stream will terminate after last failure 
def attempts(s2: Stream[F, FiniteDuration]): Stream[F, Attempt[A]] = ???

def retryIncremental[F[_]](
    start: FiniteDuration
    , increment: FiniteDuration
    , maxAttempts: Int
  )(implicit S: Scheduler) : Stream[F, FiniteDuration] = ??? // it think this is obvious implementation 

//
s1 attempts retryIncremental
s1 attempts retryXXX  // whatever other retry helpers you may find usefull. 

// on Stream singleton

def retryEval(fa: F[A])(s2: Stream[F, FiniteDuration]): Stream[F, Attempt[A]] = ???

I am still puzzled ab out this retry combinator, but if you really think it is use full lets have it there. Perhaps just lift the function to take an F would be a good idea.

@mpilquist re alias package, agreed, was not sure about them either

SystemFw commented 7 years ago

So let's lift restriction on attempts to be single stream. To do so we will need to implement it via pull, consulting durations only when the pull fails.

Do I need the pull there? I have tried using attempts with multiple element streams, and it works with the semantics you expect, I would just have to eliminate the comment on it. Or is there something I'm missing there?

re: retry, I'm not particularly attached to the specific set of parameters there, it was just the original signature @mpilquist asked me to PR, but may I ask why are you puzzled? If you consider that isRetriable has in my opinion a decent default, the parameters are not that different from your retryIncremental, except you hardcode constant increment, I have a simple function.

I also think having at least one function which just returns the result instead of Attempt is useful: in most cases you don't want to deal with the error, because you already have a strategy for errors (retrying them)

re: F vs Stream. I do think F makes more sense in retry (that's what my original implementation had), since there's no way to enforce a singleton Stream in the types. How does it work for resource safety? I.e. in this specific case, given that nothing flatMaps on the fa, could one do Stream.bracket(acquire)(fa)(cleanup).runLog.map(_.head), if resource safety for the retriable F is important?

pchlupacek commented 7 years ago

@SystemFw no problem if pull is not required, would be excellent if you manage it so.

re the retry. Is just simply driven by the fact that it is so much you can do in scope of F instead of pure function. I simply don't see too much you could do here in pure function, specifically if you have two of them. If you sort of think that this is overkill, no problem, I can live with that, if we will change attempts to emit more A. I just belive it will be so much usefull if we lift F there. Plus I don't believe this will complicate the thnigs that much.

re F we have attemptEval already, so its sort of idea to have attemptRetry too, but if there will be jsut attempts, I am ok with that. Ofc the resource safety is issue as I pointed out earlier, but sort of same happens in evalAttempt that we already have in code.

the problem with F is more like the resource cannot outlive the computation scope, where in stream it can, i.e. the stream of Attempt[A] will have resources open for all elements on Right, and will close them as soon as Left is received. With F signature, that is not possible.

SystemFw commented 7 years ago

So, attempts already emits multiple elements, it's just the comment that misleads. I can rename that to retry, if you prefer, since it's the most fundamental of the two.

As for my old retry, I want to provide something that replaces scalaz.Task.retry for cats-effect IO. In fact, my preferred signatures would be:

  // general, works with multiple elements, you can use combinators to manipulate it
  def attempts[F[_], A](s: Stream[F, A], delays: Stream[F, FiniteDuration])(
    implicit F: Async[F], ec: ExecutionContext): Stream[F, Either[Throwable, A]] =
    s.attempt ++ delays.flatMap(delay => sleep_(delay) ++ s.attempt)

// non-general, replaces `scalaz.Task.retry`. We can find a better, more specific name
// Notice input and return type
  def retry[F[_], A](fa: F[A],
                     delay: FiniteDuration,
                     nextDelay: FiniteDuration => FiniteDuration,
                     maxRetries: Int,
                     retriable: Throwable => Boolean = internal.NonFatal.apply)(
    implicit F: Async[F], ec: ExecutionContext): F[A] = {
     val delays = Stream.unfold(delay)(d => Some(d -> nextDelay(d))).covary[F]

    attempts(s, delays)
      .take(maxRetries)
      .takeThrough(_.fold(err => retriable(err), _ => false))
      .runLog
      .map(_.head.getOrElse(sys.error("impossible))
  }

@mpilquist WDYT?

pchlupacek commented 7 years ago

@SystemFw ah, you right after reading carefully attempts implementation, it is indeed ok for multiple value streams :-). Sorry for confusion. Lets just update comment there.

I am not specific hero over naming, so retry, attempts both work for me.

I think if we would introduce anything that works on F[A] that shall return Stream[F, A] . And I would then put it on Stream object and name it retryEval

mpilquist commented 7 years ago

Looks good to me. Given our current code organization, both methods should be on Scheduler. I can see an argument for putting them on Stream instead but I think that argument applies equally well to things like sleep, awakeEvery, etc.

SystemFw commented 7 years ago

So, to summarise:

I will implement the necessary changes and open another PR

pchlupacek commented 7 years ago

Fabio thanks a lot and sory again for confusion. @mpulquist my feeling us that sleep, awake every do not take F[A] as argument so is ok to have them on scheduler

SystemFw commented 7 years ago

No problem at all :)