razshare / sveltekit-sse

Server Sent Events with SvelteKit
https://www.npmjs.com/package/sveltekit-sse
MIT License
301 stars 9 forks source link

`Locked` is not set to false when closing from client side #26

Closed OTheNonE closed 9 months ago

OTheNonE commented 9 months ago

When i close the value stream, the locked variable on the server side is not set to false automatically, even though the function leaves the "endless await":

Client:

let value = source("my/endpoint")
value.close()

Server:

let locked = writable(true)
return event(async emit => {
    emit("something")
}, { locked })
    .onCancel(() => console.log(get(locked))) // It prints `true`
    .toResponse()

I would expect this value to be false, as the connection has been closed. If there is something i have missed or misunderstood, please let me know.

razshare commented 9 months ago

I would expect this value to be false, as the connection has been closed.

Yeah, that makes sense, I'll fix it tomorrow.

Thanks for the report!

OTheNonE commented 9 months ago

Thank you for this awesome library!

razshare commented 9 months ago

hi @OTheNonE . I took a look at this.

I don't think this is currently possible to achieve in SvelteKit.

In SvelteKit's file system router api there is currently no way to determine when a client closed an http connection on their side.

I would've expected the sveltekit server to recognize that the response contains a stream and invoke cancel() on it whenever the client disconnects, but that doesn't seem to be the case.

I've opened an issue on the sveltekit project here https://github.com/sveltejs/kit/issues/11751

This will remain unresolved until we get a proper way of detecting cancled connections in sveltekit.

razshare commented 9 months ago

Hi @OTheNonE , I just released v0.8.0 with some major changes to the API and a beacon mechanism to detect canceled connections.

The following should do exactly what you're asking for using the new api

<!-- Client -->
<script>
  import { source } from '$lib/source.js'
  const connection = source('/events')
  const channel = connection.select('cat-quote')
  const quote = channel.json()

  setTimeout(connection.close, 2000)   // <=== this will trigger the unlock and cancelation
</script>
//Server

export function POST({ request }) {
  return events({
    request,
    start({ emit, lock }) {
      emit('message', 'hello world')
      lock.subscribe(function disconnected($lock) {
        if ($lock) {
          return
        }
        console.log('Stream unlocked.')  // <== logging here
      })
    },
    cancel() {
      console.log('Stream canceled.')  // <== and here
    },
  })
}

The server will log in the following order

Stream canceled.
Stream unlocked.

You can read more about the new locking mechanism here.


Let me know if this answers your original request.

OTheNonE commented 9 months ago

Nice! I should maybe share all of my own example:

server:

export async function POST({ cookies, request }) {
    return events({ request,
        async start({ emit, lock }) {

            let token = cookies.get('token')

            if (!token) {
                console.log("User is not signed in.")
                lock.set(false)
                return
            }

            let { client, is_authenticated } = await getClient({ token })

            if (!is_authenticated) {
                console.log("User is not authenticated.")
                lock.set(false)
                return
            }

            lock.subscribe(is_locked => {
                if (!is_locked) return
                client.close()
            })

            await client.live<Measurement>(MEASUREMENT, ({ action, result }) => {

                if (action == "CLOSE" || get(lock) == false) return

                if (action == "CREATE") {
                    let info = emit(MEASUREMENT, JSON.stringify(result))
                    info.value
                }
            })

        },
        cancel() {}

    })
}

client:

let connection = source('home/live-measurement', {
    close: event => console.log(event),
    error: event => console.log(event),
})

let measurement = connection.select(MEASUREMENT).json<Measurement>();

I would rather like to have client.close() inside of the cancel() { ... } callback function instead of lock.subscribe(is_locked => { ... }), but as the client is defined inside of the start() { ... } function (as i believe it must according to the docs), this is not possible.

If some step of the authentication do not succeed, then i "open" the lock, and return from the start function. Is it okay cancel the connection like this?

razshare commented 9 months ago

but as the client is defined inside of the start() { ... } function (as i believe it must according to the docs), this is not possible.

@OTheNonE yes, that is correct.

I just released v0.8.2, you can now return a function from start() which will be executed when a connection is canceled, similar to how readable and onMount behave in svelte.

So in your case, this should work

import { events } from 'sveltekit-sse'
import { get } from 'svelte/store'

type Measurement = ''
const MEASUREMENT = ''

export async function POST({ cookies, request }) {
    return events({ request,
        async start({ emit, lock }) {
            const token = cookies.get('token')
            let $lock = true

            const unsubscribe = lock.subscribe(function run($value){
                $lock = $value
            })

            if (!token) {
                lock.set(false)
                return function cancel(){
                    unsubscribe()
                    console.log("User is not signed in.")
                }
            }

            const { client, is_authenticated } = await getClient({ token })

            if (!is_authenticated) {
                lock.set(false)
                return function cancel(){
                    unsubscribe()
                    console.log("User is not authenticated.")
                }
            }

            client.live<Measurement>(MEASUREMENT, function run({ action, result }) {

                if (action == "CLOSE" || $lock == false) {
                    return
                }

                if (action == "CREATE") {
                    const {error} = emit(MEASUREMENT, JSON.stringify(result))

                    if(error){
                        console.error(error)
                    }
                }
            })

            return function cancel(){
                unsubscribe()
                client.close()
            }
        }
    })
}
OTheNonE commented 9 months ago

Sorry, but this was not the thing i had in mind at all, i was hoping to be able to have a single source to all the clean-up that has to be done when the connection is closed. With this new implementation, you still have the ‘close’ handler written inside of the ‘start’ function, and not all-collected next to ‘start’. I also find it not very intuitive that the returning value from the start function is the clean-up close-function, i dont get why that relation makes sense, and it works basicly the same as ‘lock.subscribe(is_locked => { … })’. Ofcourse, i dont know well enough how “svelte” like code should be written, but if you have the arguments for this implementation, then it probably is okay (Still grateful for all the work in this library<3), but compared to my current solution, i’ll stick with that rather than this proposed solution.

razshare commented 9 months ago

I also find it not very intuitive that the returning value from the start function is the clean-up close-function, i dont get why that relation makes sense

Returning a "stop" function from your "start" function is a common pattern in svelte (and other libraries as well, I believe).

That's how readable works, for example

const data = readable('', function start(set) {
  console.log("There's at least 1 subscriber to this store.")
  const dataProvider = new SomeDataProvider()

  dataProvider.addEventListener("data", function run(data){
    set(data)
  })

  return function stop(){
    console.log("There are no more subscribers to this store.")
    dataProvider.close()
  }
})

It makes sense because the "start" function always declares the kind of state that you need to clean up in your "stop" function.

Take onMount as another example

<script>
onMount(function start(){
  console.log("Component created.")
  const interval = setInterval(function run(){
    console.log("The time is", new Date())
  }, 1000)

  return function stop(){
    clearInterval(interval)
    console.log("Component destroyed, I don't know what the time is anymore.")
  }
})
</script>

It saves you the hassle of declaring a global state to manage the cleanup,

Using another example, let's say we have a chat component that needs to always be aware of large payloads coming in and always process them without fail, even as the component is being destroyed.\ If this type of thing were not possible, you would be forced to do something like this

<script>
  import { onDestroy, onMount } from 'svelte'

  /**
   * @type {string}
   */
  export let chatRoomId

  // The server doesn't need to know about these variables to begin with

  /**
   * @type {WebSocket}
   */
  let websocket

  /**
   * @type {false|Promise<Record<string,any>>}
   */
  let incomingLargePayload = false

  onMount(async function start(){
    // The reason this code must live under `onMount`
    // is because we don't want to execute it on the server also.

    /**
     * @type {false|((contents:string)=>void)}
     */
    let resolveLargeData = false

    websocket = new WebSocket(`wss://example.com/chatroom/${chatRoomId}`)
    websocket.addEventListener('message', function run({data}){
      if(incomingLargePayload){
        resolveLargeData(data)
        showNotification(`File received.`)
        incomingLargePayload = false
      }

      const parsed = JSON.parse(data)
      if(parsed.type === 'file-metadata'){
        // Next payload is the actual content
        showNotification(`Incoming file from ${parsed.author}`)
        incomingLargePayload = new Promise(function run(resolve){
          resolveLargeData = resolve
        })
        return
      }
    })

  })

  onDestroy(async function stop(){
    // The only reason this `onDestroy` exists is because we
    // need to cleanup stuff declared in `onMount`.

    if(incomingLargePayload){
      await incomingLargePayload
    }

    if(websocket && websocket.readyState === websocket.OPEN){
      websocket.close()
    }
  })
</script>

On the other hand, returning a cleanup function makes things easier to write, follow and especially to refactor, because you don't need to worry about messing up some external piece of state or function that lives who knows where.

<script>
  import { onMount } from 'svelte'

  /**
   * @type {string}
   */
  export let chatRoomId

  onMount(async function start(){
    // The reason this code must live under `onMount`
    // is because we don't want to execute it on the server also.

    const websocket = new WebSocket(`wss://example.com/chatroom/${chatRoomId}`)

    // No need to declare state for this part, you just wait for it inline
    await new Promise(function run(resolve){
      websocket.addEventListener('open', resolve)
    })

    // No need to append "Data" and other keywords to these 
    // name, we know what we're doing inside here.

    /** @type {false|Promise<string>} */
    let promise = false
    /** @type {false|Function} */
    let resolve = false

    websocket.addEventListener('message', function run({data}){
      if(resolve){
        resolve(data)
        showNotification(`File received.`)
        return
      }

      const parsed = JSON.parse(data)
      if(parsed.type === 'file-metadata'){
        // Next payload is the actual content
        showNotification(`Incoming file from ${parsed.author}`)
        promise = new Promise(function run(r){
          resolve = r
        })
        return
      }
    })

    return async function stop(){
      await promise
      websocket.close()
    }
  })
</script>

Everything is contained in just one function, you can even get all that code and move it into a different function in a file and your original code becomes much more modular and cleaner

<script>
  import { onMount } from 'svelte'
  import { manageChatroom } from './manageChatroom'
  /**
   * @type {string}
   */
  export let chatRoomId

  onMount(function start(){
    return manageChatroom(chatRoomId)
  })
</script>

You can't do the same when splitting the "start" and "stop" functions, because the cleanup function needs to know about what kind of state it needs to clean up.\ And even if you created some shared state or put them in the same module, you would still need to manage that shared state separately.

So yes, "centralizing" the cleanup is a good thing, but in these cases it means you're decentralizing it from the source that's actually creating the state you need to clean up.

[!NOTE] In the case of readable there's actually no other provided way to do the cleanup, it just makes more sense that way.

For the same reasons, the start() function of events() allows returning a cleanup function.\ I was actually considering removing the cancel() function altogether in favor of this pattern of returning a cleanup function.

I'm closing this issue because the requested functionality regarding lock has been implemented.

However, I'm open to suggestions on this topic for now.\ Feel free to open a new issue with examples of what kind of API you would rather have instead.

OTheNonE commented 9 months ago

I see, you're right, and after reading this, and playing around with the code, i ended up with almost the same result as you provided me with, and i can see all the pros in this pattern:

export async function POST({ cookies, request }) {
    return events({ 
        request, 
        async start({ emit, lock }) {

            let token = cookies.get(TOKEN)

            if (!token) {
                lock.set(false)
                return () => {
                    console.log("User is not signed in.")
                }
            }

            let { client, is_authenticated } = await getClient({ token })

            if (!is_authenticated) {
                lock.set(false)
                return () => {
                    console.log("User is not authenticated.")
                    client.close()
                }
            }

            await client.live<Measurement>(MEASUREMENT, ({ action, result }) => {
                if (get(lock) == false) return
                if (action == "CREATE") emit(MEASUREMENT, JSON.stringify(result))
            })

            return () => {
                console.log("Connection closed.")
                client.close()
            }

        }
    })
}

I also started to like the way that i can escape the procedure midway and handle that that specific error, e.g. if there is no token, or if the client is not authenticated. You convinced me, thank you!