raquo / Airstream

State propagation and event streams with mandatory ownership and no glitches
MIT License
246 stars 28 forks source link

Killing subscriptions from observers (?) #102

Open raquo opened 2 years ago

raquo commented 2 years ago

This is related to observable completion (#23), but not quite the same feature.

Users should be able to kill a subscription from inside an observer, or more specifically, from where they're defining the subscription. See Iurii's original request on Apr 10 2019 06:07.

Syntax sugar aside, the minimum useful concept would be:

type KillSubscriptionFn = () => Unit  // kills the relevant subscription when called
trait Observable[A] {
  ...
  def addObserver(makeObserver: KillSubscriptionFn => Observer[A])(implicit owner: Owner): Subscription = ???
}

Whereas observable completion feature allows observables to indicate that they have stopped emitting (to all of their observers), this gives users control from the opposite direction – giving individual Observers a way to unsubscribe when they no longer wish to receive updates. I'm not sure, but I think we will eventually want both of those control mechanisms.

Technically, observables are already able to decide if and when to unsubscribe from their parents (by removing their internalObserver from the parent observable), so perhaps we could actually implement this feature on the observable side. For example, in Laminar syntax:

fooStream.withSubscriptionKiller --> { (foo, killSubscription) =>
  if (...)
    killSubscription()
}

Where the withSubscriptionKiller operator creates a new stream which provides a unique killSubscription function to each of its observers, such that said function removes just that one observer. Although, I'm not sure if it's feasible to implement it this way – Airstream observables always emit the same values to all observers. Instead of providing different values to different observers, perhaps we could pass the observer itself as a parameter to killSubscription, but that has other problems.

Still not sure how to do it. Perhaps we could emulate desired behaviour using the stream-completion feature:

fooStream.completeByCallback --> { (foo, complete) =>
  if (...)
    complete()
}

Same style, different semantics / implementation – completeByCallback creates a new stream which will mark itself as complete when the callback is called. So, such a stream shouldn't be shared between several observers if individual observers are to be used.

The problem with implementing this using stream-completion feature is its interaction with restarting streams. I don't remember my latest thinking on this, but I it's possible that observables that complete should never emit again – not even after the user tried stopping and re-starting them. IF that's the case, this might not be desirable for this feature, perhaps what's desired is merely to stop the subscription until it's started again (e.g. if re-mounting the component).