razshare / sveltekit-sse

Server Sent Events with SvelteKit
https://www.npmjs.com/package/sveltekit-sse
MIT License
274 stars 9 forks source link

Best way to handle reconnection? (fatal error vs. retriable error) #39

Closed oscarhermoso closed 4 months ago

oscarhermoso commented 4 months ago

Context:

In the Azure/fetch-event-source README, they show an example of fatal and retriable errors.

fetchEventSource('/api/sse', {
    async onopen(response) {
        if (response.ok && response.headers.get('content-type') === EventStreamContentType) {
            return; // everything's good
        } else if (response.status >= 400 && response.status < 500 && response.status !== 429) {
            // client-side errors are usually non-retriable:
            throw new FatalError();
        } else {
            throw new RetriableError();
        }
    },
    // ...
}

It would be good to have the same or better documentation for SSE too. Currently docs have a very simple example:

https://github.com/tncrazvan/sveltekit-sse?tab=readme-ov-file#reconnect

Cases to consider:

Solution:

It may be possible to solve this just with a documentation update... however, I think might be better to reduce some of the abstraction of the sveltekit-sse library.

Part of the library that is currently hidden insides events() (eg. when beacons are handled). This forces all of the endpoiont logic to run within the start() callback. Therefore, we are unable to use SvelteKit's error inside of events() as it throws an unhandled error and will crash Node.

Instead, if beacon handling is run as a separate function, SvelteKit's error() will be thrown before events() is returned, returning a 400-500 HTTP response. The source() method on the client can be updated to handle it properly.

import { error } from '@sveltejs/kit';
import { events, /** new */ handleBeacon } from 'sveltekit-sse';

export function POST({ route, request, locals, params, cookies }) {
  // handle beacon transparently
  // less questions like this: https://github.com/tncrazvan/sveltekit-sse/issues/36#issue-2256422243
  if (handleBeacon(request)) return Response();

  // Free to perform tasks in the outer function scope - eg. validate session, fetch from DB, etc.
  const user_data = db.select() //...

  // Can throw SvelteKit's `error()` here to prevent an SSE stream from starting
  if (!locals.user_id) error(401, 'Unauthorized');

  // After validating user identity, return event stream.
  // If the event stream drops out, client can attempt to reconnect.
  return events({
    request,
    async start({ emit, lock }) {
      const unsub = notifications(user_id).subscribe(event => {
        if (data.error) {
          // eg. logged out in a different tab, must back log-in before reconnecting
          emit('fatal', data.error.message);
          lock.set(false);
        }
        else {
          emit('data', event.data);
        }
      });

      return unsub;
    }
  });
}

Workaround:

Currently, I am emitting events with a different name depending on the error. This works OK for now.

razshare commented 4 months ago

I think the new handleBeacon would make sense indeed, for sure.\ For those who need or want such a function I can create it and document why it exists.

We shouldn't force its use though.\ Whatever you can do in the body of the POST request you can do in the body of start(), with the exception of throwing exceptions (no pun intended), they won't bubble up as you pointed out with error().

Speaking of, with regards to the error() part, it's possible to make it work using a pipe stream or something like that, but it's not worth it.\ Mainly because once the response is served, e.g. once you emit your first event, you can't back up and throw an error... I mean you could just stop the stream, but you lose the advantage of setting the proper status code of your request.\ Reason why we're using an Either<T> monad pattern with emit(), instead of throwing exceptions.

const {error} = emit("...", "...")

We're using javascript after all, exceptions are a mess compared to other solutions out there.

[!WARNING] Also, I don't even think error() is a good solution overall in this case, think about this: what if a user's session/token expires mid stream? How are you gonna notify that using error()? Well you can't because error() is plain http based, it assumes you haven't served a response yet.\ I think proper sse events (like you're currently doing) are generally a way better solution here, but I like options as long as they don't pollute the main api, and handleBeacon is fine.

More reasons to not bother with error(), let the userland deal with it if they want to use it.\ The handleBeacon example you're providing I think is more than enough for that matter.

However, this is an interesting edge case

if (response.status >= 400 && response.status < 500 && response.status !== 429)

I'm thinking to always fail the connection and drop everything if there's any type of error (so if status is >= 300), then forward the status code to onClose along with other details perhaps.

With regards to

User initiated disconnection (eg. connection.close() called in browser), should not reconnect

actually I was gonna implement this just before the recent rework: the ability to distinguish between a close callback triggered by the client as opposed to one triggered by the server.\ I think it will still be easy to implement.

So to recap, ignoring the error() part which will be sattisfied by handleBeacon(), the final api would look something like this

source(`/events`, {
  close({ 
    connect, 
    status, // response status
    statusText, // response message
    isLocal, // if `true`, then this event has been triggered locally, by the browser
  }) {
    if(isLocal || status === 401){
      return
    }

    console.log('reconnecting...')
    connect()
  },
})

That object passed to close() is an Event https://github.com/tncrazvan/sveltekit-sse/blob/39a647269f909c0e7e6ed1dfbde2c2fa2beec70a/src/lib/types.js#L13-L34

And these new properties will be available to all event listeners, including error() and the internal message event.

Specifically isLocal could become more useful in the future, for example if we ever want to allow the client to mock/inject local messages and be able to distinguish between local and remote messages.


I'll be back with more updates on this, it should all be relatively easy to implement.

I'm worried about polishing the api and not depending on Azure's fetch-event-source api too much, especially since they don't seem to use jsdoc or anything to properly document those events.

razshare commented 4 months ago

Version 0.12.0 is out.

@oscarhermoso going back to your previous issue https://github.com/tncrazvan/sveltekit-sse/issues/33 you will now need to check specifically if the stream is disconnecting because of a local close event or because of a server disconnect.

Here's a repository to showcase the usage https://github.com/tncrazvan/sveltekit-sse-issue-39

You will need to add this part specifically https://github.com/tncrazvan/sveltekit-sse-issue-39/blob/1d1ea7e22e98a94c0671c588c0579a0e4dd04053/src/routes/%2Bpage.js#L9-L11

const connection = source(`/events?${searchParams}`, {
  cache: false,
  close({ isLocal }) {
    if (isLocal) {
      // Don't reconnect if the event is local
      return;
    }
    connect();
  },
});

Previously the close event didn't trigger when the stream would disconnect due to insufficient subscribers, now it does, and it does so with isLocal:true.


Let me know if this addresses your points.

oscarhermoso commented 4 months ago

Wow, another awesome update. Thanks @tncrazvan!

Yes, I agree the error() pattern is not ideal - but it is already idiomatic to SvelteKit. If someone is using another library like ciscoheat's sveltekit-rate-limiter alongside sveltekit-sse, or have written handle middleware into hooks.server.ts, then it's very possible that error() will already be emitted.

If an unexpected exception is thrown in the outer scope, it will be dispatched as a default App.Error with status 500, so it's important that sveltekit-sse can handle this.

Thankyou for adding support by exposing status and statusText to the close/error callbacks in source, and exporting findBeacon/extend.

The one point that I think has not been covered by this update is this:

Server announced fatal disconnection after event stream has started, should not reconnect

eg. This is roughly the code that I am using in my app currently:

export function POST({ route, request, locals, params, cookies }) {

  // ...

  return events({
    request,
    async start({ emit, lock }) {
      const unsub = notifications(user_id).subscribe(data => {
        if (data.error) {
          // eg. logged out in a different tab, must back log-in before reconnecting

          emit('stop', data.error.message); // custom implementation

          // lock.set(false); // TODO: is this safe? 
          // not sure if there is a race condition here emit's controller.enqueue & lock's controller.close
        }
        else {
          emit('data', data);
        }
      });

      return unsub;
    }
  });
}

Then on the client, I listen for these events:

// $routes/+layout.ts

const notifications = add_stop_listener(
    source(
        `/user/${user_id}/notifications`,
        default_source_opts, // close handler, error handler
    ),
)
    .select('notifications')
    .json<Notifications>(({ error, raw, previous }) => {
        console.error(`Could not parse "${raw}" as json.`, error);
        return previous;
    })
// $lib/sse/index.ts

export function add_stop_listener(source: ReturnType<typeof import('sveltekit-sse').source>) => {
    let initial = true;
    const unsub = source.select('stop').subscribe((data) => {
        if (initial) {
            // svelte always calls the subscriber once with the current value
            // don't unsubscribe when this happens
            initial = false;
            return;
        }

        if (data) {
            console.warn('Stream disconnected with message:', data);
        } else {
            console.log('Stream disconnected.');
        }

        source.close();
        unsub();
    });

    return source;
};

This works fine for me. Up to you as to whether you see this as being something that the sveltekit-sse library could provide, or whether you see it as a task for users to implement.

razshare commented 4 months ago

Then on the client, I listen for these events:

This is the correct usage I would say, I don't want to lock userland into specific types of error management after the stream has already started, first because the W3C spec doesn't mention anything regarding to error management at a protocol level.

Second, because that's a pandora box and we won't ever be able to close it.

Third, it means we would have to reserve some event names to automatically deal with those errors, which is not ideal because, although the W3C spec does specify some special event names, such as message image there is not indication for any error event, for example. Which means we would have to stray away from the spec.

oscarhermoso commented 4 months ago

Awesome, no worries then. That's all of my questions answered, so I will close this ticket.

Thank you again.