liftbridge-io / go-liftbridge

Go client for Liftbridge. https://github.com/liftbridge-io/liftbridge
Apache License 2.0
67 stars 18 forks source link

Question about undefined behaviour on resubscribe to stream #51

Closed FZambia closed 3 years ago

FZambia commented 4 years ago

@tylertreat hello! Really enjoy reading a code of Liftbridge project and like its concept. Especially I like your decision to not invent new network protocol and just go with GRPC, this seems a pretty good choice in terms of client implementations simplicity.

I have a question about automatic resubscriptions to a stream. Here is a simplified code from this client:

func (c *client) dispatchStream(...) {

    defer releaseConn()
    var (
        lastOffset  int64
        lastError   error
        resubscribe bool
        closed      bool
    )
LOOP:
    for {
        var (
            msg, err = stream.Recv()
            ...
        )
        if msg != nil {
            lastOffset = msg.Offset
        }
        if err != nil {
            lastError = err
        }
        ...
        if err != nil {
            break
        }
    }

    // Attempt to resubscribe to the stream leader starting at the last
    // received offset. ...
    if resubscribe {
        deadline := ...
        for ... {
            err := c.Subscribe(ctx, streamName, handler, StartAtOffset(lastOffset+1))
            if err == nil {
                return
            }
        }
    }
}

My question is what if we just subscribed to a stream, for example to latest position, have not yet received a new Message from it and then a recoverable error happens. So at this moment lastOffset is 0. Client will then try to resubscribe with StartAtOffset(1). This seems broken for me as we are resubscribing with insufficient offset, the real required offset can be different. Am I right or missing sth?

tylertreat commented 4 years ago

Yes, I think you have discovered a nice bug. :)

Will need to think what the right solution is.

tylertreat commented 3 years ago

Fixed with #110.