zebulonj / callbag-subscribe

A callbag sink (listener) that connects an Observer a-la RxJS. 👜
MIT License
18 stars 5 forks source link

Fix issue where Dispose() called after source terminated potentially violates spec #11

Open bjnsn opened 3 years ago

bjnsn commented 3 years ago

Spec says: A callbag MUST NOT be delivered data after it has been terminated

As I understand it, for callbags to work in concert all callbags must either: 1) not send 'terminate' to sources/sinks after said source/sink notifies it has terminated 2) guard against case where callbag has notified sources/sinks of termination and is itself subsequently terminated

I expect 1) - though perhaps @staltz should weigh in...

If 2) then I believe this issue should be raised with @Andarist in callbag-remember since it doesn't guard against that case.

Example: https://stackblitz.com/edit/typescript-6w7v6u?file=index.ts

import pipe from "callbag-pipe";
import of from "callbag-of";
import subscribe from "callbag-subscribe";
import remember from "callbag-remember";

// once `of` arguments are exhausted, it sends dispose signal to its sink (remember)
const num$ = remember(of(1));

const cancelSubscription = pipe(
  num$,
  subscribe(value => console.log(value))
);

setTimeout(
  () =>
    // when subscription is canceled, it sends dispose signal to its source (remember)
    cancelSubscription(),
  100
);

// remember throws error after recieving dispose signal from both source and sink
Andarist commented 3 years ago

The fix here looks OK - I believe that stuff like this should be mostly handed on the ends of the callbag chains. It's the subscribe that knows that it shouldn't send 2 upwards after the source is completed (or errored).

bjnsn commented 3 years ago

@Andarist It looks like handling this at the end of the chain won't fully solve the problem. In cases where remember is used with subscribe and a combination operator such as combine - the combination operator would need to terminate only those sources that haven't terminated, which isn't currently happening. Using the current patch there still are issues: https://stackblitz.com/edit/js-vqe3br

import fromPromise from "callbag-from-promise";
import interval from "callbag-interval";
import combine from "callbag-combine";
import pipe from "callbag-pipe";
import remember from "callbag-remember";

const str$ = remember(fromPromise(Promise.resolve("foo")));
const time$ = interval(1000);
const combine$ = combine(str$, time$);

const dispose = pipe(
  combine$,
  patchedSubscribe(val => {
    console.log(val);
  })
);

setTimeout(() => dispose(), 1500);
Andarist commented 3 years ago

Just to clarify if this wasn't clear beforehand - by "at the end" I have meant "as close to the end as possible".

I can't say that remember is 100% correct and it definitely shouldn't change but I'd only like to touch it after we fix other issues and prove that it has to be changed. This is IMHO a good process as it allows us to recognize those unhandled scenarios in other operators.

As to this particular case - I'm pretty sure that combine should "track" the completion state of its sources here: https://github.com/staltz/callbag-combine/blob/912a5ec8ec3d9e65d3beccdc7a53eabd624c1c8a/readme.js#L65 which could simply be done by nullifying the correct source talkback slot. More than that - errors from sources seem to be currently ignored by combine and that could be fixed as well.

bjnsn commented 3 years ago

Right - I believe the talkback from the sink (in this case callbag-subscribe) is handled here and that the list of talkbacks is accumulated but never culled.

Andarist commented 3 years ago

Yep, I suspect that this should be enough to fix this particular problem:

diff --git a/readme.js b/readme.js
index 5de5d3f..d66c240 100644
--- a/readme.js
+++ b/readme.js
@@ -45,7 +45,7 @@ const combine = (...sources) => (start, sink) => {
   const sourceTalkbacks = new Array(n);
   const talkback = (t, d) => {
     if (t === 0) return;
-    for (let i = 0; i < n; i++) sourceTalkbacks[i](t, d);
+    for (let i = 0; i < n; i++) sourceTalkbacks[i] && sourceTalkbacks[i](t, d);
   };
   sources.forEach((source, i) => {
     vals[i] = EMPTY;
@@ -62,6 +62,7 @@ const combine = (...sources) => (start, sink) => {
           sink(1, arr);
         }
       } else if (t === 2) {
+        sourceTalkbacks[i] = null;
         if (--Ne === 0) sink(2);
       } else {
         sink(t, d);