use circular buffer in FiniteReplayProvider #23

Closed hugowetterberg closed 4 months ago

hugowetterberg commented 4 months ago

Uses NewFiniteReplayProvider() to instantiate the provider instead of direct literal. This allows catching count misconfiguration at instantiation (typically application startup) instead as a later panic. In my opinion all panics should be replaced with returning errors, all uses of panic I looked at in the library are in the "expected errors" category. Keeping panic behaviour in Put(), as changing that is very much out of scope.

Synchronises access to the underlying buffer and head and tail state using an RWMutex. I haven't looked closely at the concurrency handling in the library as a whole, so if you're already serialising access to the replay providers this doesn't add any value. If not, a similar mutex should be added to the ValidReplayProvider.

hugowetterberg commented 4 months ago

If you don't like the NewFiniteReplayProvider/error-approach it should be removed and made consistent with ValidReplayProvider instead. Likewise I don't think that these changes should be released/merged without consistently switching to errors instead of panics.

tmaxmax commented 4 months ago

Thank you for the PR!

if you're already serialising access to the replay providers

This is done inside the provider, replay providers are not required to be thread safe. There's no need to change that yourself, though. I wanted to have this code mainly as a reference for benchmarks and tests. Besides, the ValidReplayProvider should be changed aswell, as the underlying buffer was the same which means it also leaks memory.

all uses of panic I looked at in the library are in the "expected errors" category

My take on this would be that these panics are caused by programming issues rather than normal execution which means they are bugs. For example, dispatching a Message with no topics would be a bug, which should be caught in testing. Besides, the causes of these panics can be statically determined, I believe. Here are a few examples of what I think:

There is a case to be made about errors related to topics – while in most scenarios I do think they would be provided statically, I can imagine some where they would be pulled from a data source. I'd say that even then, if the data source gives nothing back, it means the data is corrupted and that its errors were not handled. Though it would be prudent to not crash the whole program in these cases.

Not panicking would allow for better error handling in unusual cases, allow for custom error logging and, most importantly, not crash the program. What do you think? Have you encountered in your use a scenario where you'd have preferred if errors were returned instead?

I don't think that these changes should be released/merged without consistently ...

Totally agree. The changes will be cohesive.

hugowetterberg commented 4 months ago

This is done inside the provider, replay providers are not required to be thread safe. There's no need to change that yourself, though. I wanted to have this code mainly as a reference for benchmarks and tests.

:+1: Sounds good!

My take on this would be that these panics are caused by programming issues rather than normal execution which means they are bugs.

Buffer sizes are often something I would make configurable through f.ex. environment variables. One could argue that it's still a bug as the programmer failed to properly validate the input :shrug: but it's definitely not a static one. Likewise, the topics could be derived from the data the message represents, so dynamic rather than static. That's definitely the case for me:

Have you encountered in your use a scenario where you'd have preferred if errors were returned instead?

I always prefer errors :) and never expect code to panic. If it does it should be from programmer errors, like forgetting to nil-check some input and causing a nil panic, unforeseen errors. But almost never from writing code that explicitly calls panic(), that feels like a slippery slope to exceptions to me.

Returning errors clearly communicates that a call can fail, and allows the caller to decide how they want to act when that happens. In my use-case I would probably just log that and move on with things, for somebody else that might be a show-stopper. Errors both clearly tells the caller that that's a decision that they need to make, and makes it easy for them to do so.

tmaxmax commented 4 months ago

One could argue that it's still a bug as the programmer failed to properly validate the input 🤷 but it's definitely not a static one.

You're right here – the argument falls flat whenever there's a new layer introduced – be it some other code or an external configuration source. Otherwise pretty much every error happens because somebody didn't validate something somewhere 😃

... like forgetting to nil-check some input and causing a nil panic, unforeseen errors

That's a better definition for "programming errors" than what I had in mind. I think I'm convinced on this – I'll adapt the API to use errors and constructors. Constructors also simplify the library code, as no init logic would need to be done anymore. Given that this is a pretty big set of changes I'll probably roll them out across multiple versions (something like 0.9.0-pre.1, 0.9.0-pre.2 and so on).

Thank you for your input and time!

hugowetterberg commented 4 months ago

Great! That sounds like a sensible & responsible plan :+1:

hugowetterberg commented 4 months ago

Pushed new commit:

use replay helper function to avoid creating an intermediate buffer

TestFiniteReplayProvider had to be adjusted, as it assumed that the event with the ID 4 would still be present after pushing events {5,6,7} into a finite replay provider with a count of 3.

TestJoe_errors is failing, but I'm a bit unsure as to what it's actually testing.

tmaxmax commented 4 months ago

Where is that test failing? I should be able to help with that.

hugowetterberg commented 4 months ago
--- FAIL: TestJoe_errors (0.00s)
    joe_test.go:262: callback was called after subscribe returned
2024/02/06 15:57:29 panic: panicked
goroutine 167 [running]:
    /usr/lib/go/src/runtime/debug/stack.go:24 +0x5e*Joe).tryReplay.func1()
    /home/hugowett/Projects/go-sse/joe.go:246 +0x93
panic({0x704580?, 0x8027d0?})
    /usr/lib/go/src/runtime/panic.go:914 +0x21f*mockReplayProvider).Replay(0xc000312500, {{0xc0003504d0, 0xc0001b1d38}, {{{0xc000350490, 0x40ebe5}, 0x80}}, {0xc000316d20, 0xc000316d20, 0xc000350480}})
    /home/hugowett/Projects/go-sse/joe_test.go:32 +0x67*Joe).tryReplay(0x0, {{0x805398, 0xc00031c0e8}, {{{0x0, 0x0}, 0x0}}, {0xc0003048d0, 0x1, 0x1}}, {0x805370, ...}, ...)
    /home/hugowett/Projects/go-sse/joe.go:250 +0x102*Joe).start(0xc0003038b0, {0x805370, 0xc000312500})
    /home/hugowett/Projects/go-sse/joe.go:216 +0x2a9
created by*Joe).init.func1 in goroutine 166
    /home/hugowett/Projects/go-sse/joe.go:279 +0x1e6
FAIL   0.087s
tmaxmax commented 4 months ago

The test verifies the behavior of Joe on client errors. At the beginning it publishes two messages so that there is something to replay when a client is subscribed – the fact that the replay provider has count 1 and 2 messages are published is also kind of a test for FiniteReplayProvider, because it is programmed in such a way to be able to replay events even when the event with the client's LastEventID is the last removed event from the buffer. Let me give an example:

I think what happens here is that in the new implementation no lastRemovedID is retained and because of this no events are replayed in that test. The callErr is still returned because the Flush method is still called on the client and that's why the error assertion doesn't fail and only the called == 1 assertion does. Also, called == 1 assertion is normally there to ensure that after the client has returned an error, the provider doesn't attempt to send any more messages – that's why there's another Publish before. In this case, given that FiniteReplayProvider behaves in an unexpected manner, called will probably be 0, as no messages have actually been attempted to be sent to the client.

To fix this the previous behavior of the internal buffer type – i.e. the logic surrounding lastRemovedID should be ported. I'm wondering though if another behavior would be better; currently, if the client has the LastEventID an ID of an event which is no longer part of the buffer, nothing would be replayed. Would it be better to still replay all the events in the buffer, considering that if the client provides an ID, it's probably of an older event? Or should the provider replay all events regardless of whether the client provides a LastEventID or not? Should this be configurable? What do you think?

hugowetterberg commented 4 months ago

Yeah, that behaviour is a bit odd. Or, well, I guess it’s to provide playback for as long as it’s actually possible, but it complicates the logic around replays a bit: we can replay as long as the last event is retained OR if it was the last event to get evicted. I wonder if it’s worth that extra +1 of replay capability. It explains the behaviour in the test I changed, it didn’t assume that more than 3 events were retained.

I wonder if there’s some convention around this behaviour (replay all vs. replay none) when it comes to implementations? And is there any way to communicate to the client that the last observed event is too old for replay?

Either way, in the worst case, determining that we don’t have the event retained would require a full loop through the buffer before we actually start sending stuff.

tmaxmax commented 4 months ago

Let's leave the code as it is for now. I'll do some research to see what other libraries – in and outside the Go ecosystem – implement. As a curiosity, given that you're using this replay provider, what was the behavior you would have expected, if you've thought of it before?

EDIT: Went through pretty much all libraries listed on Wikipedia and I could only find one single framework which handled replays (an SSE implementation for Django). This library saves events for 24 hours to some persistent storage... which is clearly out of scope for the core library. Let's drop that extra event for now – if there is demand for it somebody is bound to open an issue 😃 It's also kind of arbitrary to save just one single event – there could be some old event IDs buffer (but at this point just grow the provider's capacity) or a toggle like AutoIDs with which one chooses whether all events should be replayed regardless of LastEventId or not. This line of work is not a priority now, though, so let's leave it as it is in this PR.

hugowetterberg commented 4 months ago

I’ve mostly been concerned about how to communicate to the client that the ID is too old to support resume.

Found this issue:

So maybe the best thing would be a hook or config that allows sending a message with an application specific “out of sync” event type.

tmaxmax commented 4 months ago

Hm, this is indeed important.

Perhaps there should be a specific kind of error ReplayProviders should return which indicates that for the given LastEventID nothing could be replayed. Then there could be some OnError hook on the Server which is called when a subscription fails with the error and the subscription as parameters. Inside this hook one could check if the error is that specific replay error and do something appropriate in that scenario – for example retry with a subscription without a LastEventId, send another response to the client etc.

The API could look like:

var ErrReplayInvalidID error

type Server struct {
    OnError func(r *http.Request, sub Subscription, err error) (newSub Subscription, retry bool)

With this hook I could also get rid of the Logger, given that OnSession and OnError cover pretty much the entire lifecycle of the request.

(note to self: the sentinel errors are kind of piling up; there may be some potential of unifying them into a single error type)

What do you think? The solution tries to be as generic as possible because given that this is not regulated by the spec there is no one single way to do it. As discussed in the issue you've linked – thanks for finding that btw! – some may prefer headers, some think messages are better and so on. I believe go-sse should not take a stance on these things but rather provide the tools necessary to build whatever.

hugowetterberg commented 4 months ago

Looks good, and grouping the errors into a single type could allow some more convenient error checking. I'm quite partial to using error codes. It has the benefit of making the things you check (type and code) constants instead of variables, and linters can warn if you're doing a non-exhaustive switch:

type ErrorCode string

const (
    ErrorCodeInvalidID ErrorCode = "invalid_id"
    ErrorCodeNoTopic   ErrorCode = "no_topic"

func NewError(code ErrorCode, text string) *Error {
    return &Error{
        text: text,
        Code: code,

func NewErrorf(code ErrorCode, format string, a ...any) *Error {
    err := fmt.Errorf(format, a...)

    return &Error{
        text:  err.Error(),
        cause: errors.Unwrap(err),
        Code:  code,

type Error struct {
    text  string
    cause error

    Code ErrorCode

func (e *Error) Error() string {
    return e.text

func (e *Error) Unwrap() error {
    return e.cause

func IsErrorWithCode(err error, code ErrorCode) bool {
    var e *Error

    return errors.As(err, &e) && e.Code == code

func main() {
    err := NewError(ErrorCodeInvalidID, "dunno what that is")
    if IsErrorWithCode(err, ErrorCodeInvalidID) {
        println("is invalid ID")
    } else if err != nil {

    var sseErr *Error

    if !errors.As(err, &sseErr) {
        panic("unknown error!")

    switch sseErr.Code {
    case ErrorCodeInvalidID:
        println("switch thinks this is an invalid id")
    case ErrorCodeNoTopic:
        println("switch thinks this is something that lacks a topic")
} opens up to both including root causes and adding fields carrying relevant information about the error, like the event that triggered the error.

hugowetterberg commented 4 months ago

Do you want me to do any more changes to the PR. For example the last deleted ID feature is a bit up in the air at the moment. Should I add support for it, or do you want to merge & maybe work on removing it completely before next release?

tmaxmax commented 4 months ago

I'll remove that feature for the moment.

If you have the resources, could you add the suggested test in the code review?

hugowetterberg commented 4 months ago

Of course. I’ve never done any allocation checking in tests before, so it’ll be a good exercise. Though I don’t know if it’ll explicitly catch memory leaks, as we only track allocations, not deallocations. An infinitely growing slice would definitely show up though, so 👍

tmaxmax commented 4 months ago

We don't really have a way to track deallocs, given that those are the GCs responsibility and it decides when to do them. Besides the replay provider is usually part of a long-running service, so the deallocation won't really ever happen. What is important to track is, as you said, that the slice doesn't grow.

hugowetterberg commented 4 months ago


I can see why you chose to add a lastRemovedID. When working with small buffers in tests it feels strange that the first item only can be used as a reference point for replaying the items that follow it. But that's less of a thing with more realistic replay counts.

hugowetterberg commented 4 months ago

Tagged a release in our fork, and released our service with it to stage this morning, everything works as expected so far: Screenshot_20240216_161301 Let me know if there's anything you want me to fix in the PR.

tmaxmax commented 4 months ago

That graph looks really good now! I assume the performance of the new version appears in the graph after 8 AM.

hugowetterberg commented 4 months ago

Yep! :smile: I fixed the linting and test errors.

