laserdisc-io / tamer

Standalone alternatives to Kafka Connect Connectors
MIT License
42 stars 15 forks source link

Backoff when caught up #44

Open sirocchj opened 4 years ago

sirocchj commented 4 years ago

Tamer is designed to always go as fast as possible. Sometimes it may be desirable to slow down, e.g. once we caught up with the "initial load".. We could use a scheduler controlled in the Setup and returned by it so to slow this down, when needed

sirocchj commented 3 years ago

Alternatively, we could slow down in Setup by deciding for example that when time (wall-clock) is within 1m of last timestamp fetched (to allow 1m skew) we sleep (w/o blocking). Something like

  private[this] implicit final class InstantOps(private val instant: Instant) extends AnyVal {
    def plusDuration(duration: Duration): Instant  = instant.plusMillis(duration.toMillis)
    def minusDuration(duration: Duration): Instant = instant.minusMillis(duration.toMillis)

    def orOneMinuteAgo: URIO[Clock, Instant] = UIO(Instant.now.minusSeconds(60)).flatMap {
      case oneMinuteAgo if oneMinuteAgo.isBefore(instant) =>
        val timeDifferenceMillis = instant.toEpochMilli - oneMinuteAgo.toEpochMilli
        val durationToSleep      = zio.duration.Duration(timeDifferenceMillis, TimeUnit.MILLISECONDS)
        ZIO.sleep(durationToSleep) *> orOneMinuteAgo
      case _ => UIO(instant)
    }
  }

in place of https://github.com/laserdisc-io/tamer/blob/01553754d6a0d25995fede673c4cd644b61357d8/example/src/main/scala/tamer/example/Main.scala#L16-L24

sirocchj commented 3 years ago

I'm keeping this around as I believe the delaying of the publication of the next state should be handled by Tamer, not the frontend.

The reason for this is - as things stand - both the commit of the current offset (that got processed fully) and the publication of the new state (that we want to delay) are delayed.

It's a workaround, but not a great one.