TheAngryByrd / FSharp.Control.Redis.Streams

Interop library between Redis Streams and popular dotnet streaming libraries
MIT License
13 stars 1 forks source link

On error resume #2

Open Swoorup opened 4 years ago

Swoorup commented 4 years ago

Currently using pollStreamForever like,

let stream = pollStreamForever db streamKey position PollOptions.Default
stream |> Observable.retry // On disconnection and reconnection will repeat all the elements from first given position

Is it possible to resume where it left off on Error?

Swoorup commented 4 years ago

Using the following as a slight modification to implementation of pollStreamForever allows to resume on disconnection

let pollStreamForever (redisdb : IDatabase) (streamName : RedisKey) (startingPosition : RedisValue) (pollOptions : PollOptions) =
        let mutable nextPosition = startingPosition

        Observable.taskUnfold (fun (pollDelay) ct -> task {
            let! (response : StreamEntry []) = redisdb.StreamRangeAsync(streamName, minId = Nullable(nextPosition), count = (Option.toNullable pollOptions.CountToPullATime))
            match response with
            | EmptyArray ->
                let nextPollDelay = pollOptions.CalculateNextPollDelay pollDelay
                do! Task.Delay pollDelay
                return Some (nextPollDelay, Array.empty )
            | entries ->
                let lastEntry = Seq.last entries
                nextPosition <- EntryId.CalculateNextPositionIncr lastEntry.Id
                let nextPollDelay = TimeSpan.Zero
                return Some (nextPollDelay, entries )

        }) TimeSpan.Zero
        |> Observable.collect id
let stream = pollStreamForever db streamKey position PollOptions.Default
stream |> Observable.retry // On disconnection and re-connection will resume where it left off last
Swoorup commented 4 years ago

Other option would be to simply use OnNext instead of OnError for connection issues and wrap inside an F# result

module temp.RedisRx

open System
open System.Threading
open System.Reactive.Linq
open System.Reactive
open System.Threading.Tasks
open FSharp.Control.Reactive
open FSharp.Control.Redis.Streams.Core
open FSharp.Control.Tasks
open StackExchange.Redis

module Observable =
  let internal taskUnfold (fn: 's -> CancellationToken -> Task<('s * 'e) option>) (state: 's) =
    Observable.Create(fun (obs: IObserver<_>) ->
      let cts = new CancellationTokenSource()
      let ct = cts.Token
      task {
        let mutable innerState = state
        let mutable isFinished = false
        try
            while not ct.IsCancellationRequested || not isFinished do
              try 
                let! result = fn innerState ct
                match result with
                | Some (newState, output) ->
                    innerState <- newState
                    obs.OnNext <| Ok output
                | None ->
                    isFinished <- true
              with e ->
                obs.OnNext <| Error e
            obs.OnCompleted()
        finally
          cts.Dispose()
      }
      |> ignore

      new Disposables.CancellationDisposable(cts) :> IDisposable)

  let flattenArray (observable : IObservable<Result<StreamEntry [],exn>>) =
      observable.SelectMany (fun x ->
          match x with
          | Ok o -> o |> Array.map (Ok) :> seq<_>
          | Error e -> [Error e] :> seq<_>)

let pollStreamForever (redisdb : IDatabase) (streamName : RedisKey) (startingPosition : RedisValue) (pollOptions : PollOptions) =
    Observable.taskUnfold (fun (nextPosition, pollDelay) ct -> task {
        let! (response : StreamEntry []) = redisdb.StreamRangeAsync(streamName, minId = Nullable(nextPosition), count = (Option.toNullable pollOptions.CountToPullATime))
        match response with
        | EmptyArray ->
            let nextPollDelay = pollOptions.CalculateNextPollDelay pollDelay
            do! Task.Delay pollDelay
            return Some ((nextPosition, nextPollDelay ) , Array.empty )
        | entries ->
            let lastEntry = Seq.last entries
            let nextPosition = EntryId.CalculateNextPositionIncr lastEntry.Id
            let nextPollDelay = TimeSpan.Zero
            return Some ((nextPosition, nextPollDelay), entries )

    }) (startingPosition, TimeSpan.Zero)
    |> Observable.flattenArray
TheAngryByrd commented 4 years ago

We discussed this in slack, creating a new function like pollStreamForeverSafe that returns a Result would be the best fix here.

Could you open a PR for this, also adding this function for the Hopac and Akka Streams implementations?

Swoorup commented 4 years ago

Slack convo:

sytherax  25 minutes ago
I did try hacking around code with consumer groups. In my case, each server instance can pop on and off. And each of them has to read stream individually. So put them across different consumer groups and read only unacknowledged messages which appears to do the job. But it seems like I need to have a seperate code to clean off unused consumer groups.

theangrybyrd:penguin:  24 minutes ago
 But it seems like I need to have a seperate code to clean off unused consumer groups.

theangrybyrd:penguin:  24 minutes ago
yep, that’s why i havent released anything yet =\

sytherax  24 minutes ago
lol

sytherax  23 minutes ago
I find it bit confusing reading the protocol as to why you would manually sent acknowledgement though

theangrybyrd:penguin:  23 minutes ago
I had a similar problem to this when lining up a Process to an Rx Stream (stdout/stderr), the amount of people that use stderr for no good reason basically led me to your second option (had to use Choice back then)

sytherax  22 minutes ago
so i reverted to solution keeping stream position in the client itself.

sytherax  19 minutes ago
doesn’t it make sense though, pollStreamForever to return Result<_, exn> instead?

sytherax  19 minutes ago
you could probably filter the exception to very specific exception in the implementation of pollStreamForever

theangrybyrd:penguin:  18 minutes ago
In theory, no
In practice, probably

sytherax  18 minutes ago
or have the first try catch inside pollStreamForever instead

sytherax  18 minutes ago
https://netflixtechblog.com/android-rx-onerror-guidelines-e68e8dc7383f

MediumMedium
Android Rx onError Guidelines
By Ed Ballot
Reading time
4 min read
Oct 26th, 2019

theangrybyrd:penguin:  18 minutes ago
In theory the underlying stream tech should be able to accomodate for this kind of weird behavior.

sytherax  18 minutes ago
The divergent understanding is partially because the name "onError" is a bit misleading. The item emitted by onError() is not a simple error, but a throwable that can cause significant damage if not caught. Our app has a global handler that prevents it from crashing outright, but an uncaught exception can still leave parts of the app in an unpredictable state.

sytherax  16 minutes ago
exactly, it doesn’t make sense when the underlying stream has reconnect support.

theangrybyrd:penguin:  15 minutes ago
maybe we should just catch that specific exception and ignore it and assume it will retry the connect?

theangrybyrd:penguin:  14 minutes ago
as you said with having the try catch in there

theangrybyrd:penguin:  13 minutes ago
I could see being silent also being an annoying behavior =\

sytherax  13 minutes ago
I am bit torn in this, if silent since you could be disconnected forever.

theangrybyrd:penguin:  13 minutes ago
right

theangrybyrd:penguin:  12 minutes ago
I think the best answer for now is to add a second function like pollStreamForeverSafe where it returns the Result

sytherax  12 minutes ago
Yeah I agree

sytherax  11 minutes ago
Leaving it as a Result will let the users decide what to do with Error type, log it or just ignore it

theangrybyrd:penguin:  11 minutes ago
:thumbsup:
Swoorup commented 4 years ago

@TheAngryByrd I have created the pull request. :+1: