typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.37k stars 601 forks source link

Feature request: Dynamic metering of Streams #3328

Open TobiasRoland opened 1 year ago

TobiasRoland commented 1 year ago

I initialy raised this in fs2-kafka, but I think this actually belongs here;

https://github.com/fd4s/fs2-kafka/issues/1270

In the same manner that you can use a Signal[F, Boolean] to pause consumption, which I've found incredibly useful for code with e.g. dynamic feature flags to turn on/off consumption, I am hoping to throttle consumption.

If this would be useful to other people, would you be open to a PR for this? Or, if this already exists, please do point me in its direction šŸ™

1. is there currently a baked-in way to have dynamic metering on a stream, other than `evalTap(_ => doSomeDelay())

The limit of this is that someSleep won't have access to when the stream last emitted, how much time has elapsed, etc etc, which leads me to my actual question:

2. I think this is the feature request I'm asking for, if people would find this useful

// ignoring the details of where these signals come from
val targetDelay: Signal[F, FiniteDuration] = createDelaySignal[F]() // edit: see below, signal mightn't be the best choice here
val pauseSignal: Signal[F, Boolean] = createPauseSignal[F]()

// I would like to be able to:
someStream.
    .pauseWhen(pauseSignal)
    .meteredBy(targetDelay)
   .flatMap(...etcetc)

I would want the semantics of it to internally keep track of last time it produced a message, and if signal value is emitted that is less than what was last set, AND more time has passed since it was set, then it would immediately emit and then wait for the next duration to elapse. And inversely, if the duration is increased since last, then the time-delta is taken into consideration

So - in BDD format - this is the behaviour that I'm after:

Scenario 1: Initial startup
GIVEN target delay is initially 1 minute
WHEN the stream is started
THEN it emits a message every minute as long as there are more elements to emit

Scenario 2: Delay is changed to be < than it was previously
GIVEN targetDelay is initially 1 minute
AND the stream emits one message
AND 30 seconds pass
WHEN the targetDelay signal is changed to 5 seconds
THEN a message is immediately emitted because (newTime - timePassedSinceLastInvocaton) is negative or 0
AND every 5 seconds after this, a new message is emitted

Scenario 3: Delay is changed to be > than what it was previously
GIVEN targetDelay is initially 1 minute
AND the stream emits one message
AND 30 seconds pass
WHEN the targetDelay signal is changed to 45 seconds
THEN after an additional 15 seconds elapse, a message is emitted
AND every 45 seconds after this, a new message is emitted
armanbilge commented 1 year ago

I think this is an interesting idea, but I think confusion surrounding Signal is muddying the discussion of the semantics.

if signal value is emitted

The thing is, Signal's don't actually "emit" elements. A Signal is simply a time-varying value i.e. conceptually a Time => Value. You may obtain its current value at any time, and you may also subscribe to a stream of updates when that value changes. But fundamentally it should be thought of as a value existing in continuous time.

With that in mind, pauseWhen(signal) is easy to understand, but .meteredBy(signal) is hard to make sense of.

TobiasRoland commented 1 year ago

Mm, yeah I may be using the wrong terminology here - ignoring the whole signal, whatchamacallit, and focusing more on the semantics - I do think the idea of meteredBy(someSortOfDurationThatCanChange: Whachamacallit) feels like very powerful behaviour to have.

armanbilge commented 11 months ago

I just ran into a similar usecase. Here's my API concept (scastie).

import cats.effect.Temporal
import cats.syntax.all.*
import fs2.{Stream, Pipe}
import fs2.concurrent.Signal
import scala.concurrent.duration.FiniteDuration

def waitUntil[F[_], A](
    nextTimestamp: F[Signal[F, FiniteDuration]]
)(implicit F: Temporal[F]): Pipe[F, A, A] =
  in =>
    in.flatMap { a =>
      Stream.eval(nextTimestamp).flatMap { timestampSignal =>
        timestampSignal.discrete
          .switchMap { timestamp =>
            Stream.eval {
              F.monotonic.flatMap(now => F.sleep(now - timestamp))
            }
          }
          .head
          .as(a)
      }
    }

Key ideas:

  1. The duration in Signal[F, FiniteDuration] is not the delay to meter by, but the absolute timestamp when the next element may be emitted.

  2. The stream obtains this timestamp signal effectually for every element it wants to emit. This indicates to the "controller" that a new element is ready to be emitted and thus it should have all necessary information to devise a metering strategy.

A useful implementation of F[Signal[F, FiniteDuration]] is left as an exercise for the reader šŸ˜‰

Btw, I'm not convinced that this should be added to FS2 e.g. we have upperbound living as an external library and this seems in a similar vein. In fact, would upperbound suit your needs already?