nsqio / go-diskqueue

A Go package providing a filesystem-backed FIFO queue
MIT License
467 stars 101 forks source link

add peek function #6

Closed xiezhenye closed 3 years ago

xiezhenye commented 5 years ago

We need to peek at the queue but not ack the message before ensure the message is correctly handled.

mreiferson commented 5 years ago

Thanks @xiezhenye. Could you describe this use case in more detail please?

xiezhenye commented 5 years ago

We use go-diskqueue to persist events in order to prevent lose event when process crash. If we read an event from the ReadChan and crash before the event handled, the event will lose. I added an Peek function that only read the first event but not ack it. Now, I can Peek an event, handle it, and use ReadChan to ack at to ensure the events be handled.

mreiferson commented 5 years ago

I think I follow, but would it be more consistent with the existing interface to expose PeekChan() <-chan []byte instead?

xiezhenye commented 5 years ago

Peek operation always returns the first message, not looks like reading something from a stream as a chan. Using Peek() interface{} instead of PeekChan() <-chan can also prevent misuse. For example,

for msg := range q.PeekChan() {
 ...
}

is an infinite loop.

ploxiln commented 4 years ago

I think that the concern is that with this interface:

func (d *diskQueue) Peek() []byte {
    return <-d.peekChan
}

there is no way to avoid blocking, like:

select {
    case msg := <- peekChan:
        return msg
    default:
        return nil
}

and no way to mix with other conditions, like:

select {
    case msg := <- peekChan:
        return msg
    case <- exitChan:
        return nil
}
xiezhenye commented 4 years ago

I changed the code just like Depth do.

mreiferson commented 3 years ago

we'll probably end up taking #24, closing, but thanks for the PR.