Open ronag opened 1 year ago
My suspicion is that this happens after a an inner observable throws an error syncrhonously, or the project function also throws an error.
All of this code happens synchronously:
const entry = (curr[n] = {
key,
value: EMPTY,
subscription: null,
})
let observable
try {
observable = rxjs.from(project(keys[n]))
} catch (err) {
observable = rxjs.throwError(() => err)
}
empty += 1
active += 1
const subscription = observable.subscribe({
next(value) {
if (entry.value === EMPTY) {
empty -= 1
}
entry.value = value
dirty = true
update()
},
error: _error,
})
// ** outer disposer runs before this assignment is done
entry.subscription = subscription
the only way the outer disposer can run before the asignment is done is if the consumer of this observable unsubscribes from it, also synchronously.
Initially I thought it was due to update()
and maybe the consumer doing a take()
an unsubscribing after N amounts of updates, but I see that's not possible because update()
schedules a microtask.
But if the inner observable observable = rxjs.from(project(keys[n]))
throws synchronously (or als the throwError(
you use in the catch clause), then when subscribing to that observable you will synchronoulsy receive a call on the error subscriber, which calls _error
, which calls o.error
.
This o.error
causes the consumer of the observable to immediately unsubscribe, which runs the cleanup function before you could assign the subscription to the entry
But if the inner observable observable = rxjs.from(project(keys[n])) throws synchronously (or als the throwError( you use in the catch clause), then when subscribing to that observable you will synchronoulsy receive a call on the error subscriber, which calls _error, which calls o.error.
This o.error causes the consumer of the observable to immediately unsubscribe, which runs the cleanup function before you could assign the subscription to the entry
I suspected this as well. However, when trying to make a test case the disposer seems to run afterwards.
new rxjs.Observable(o => {
process.nextTick(() => {
console.log('tick')
})
queueMicrotask(() => {
console.log('task')
})
console.log('pre subscribe')
rxjs
.throwError(() => new Error('asd'))
.subscribe({
error: (err) => o.error(err)
})
console.log('post subscribe')
return () => {
console.log('disposer')
}
}).subscribe(() => {})
Will print:
pre subscribe
post subscribe
disposer
task
tick
@ronag this is also expected.
If you think about it, there's no way RxJS can call the disposer function at that point, because you haven't even returned it yet, it's all happening synchronously.
If the inner subscription happens asynchronously though (such as your original case) then you get exactly the behaviour I explained: https://stackblitz.com/edit/rxjs-kwustg?file=index.ts
new rxjs.Observable((o) => {
queueMicrotask(() => {
console.log('task');
});
console.log('pre subscribe');
queueMicrotask(() => {
rxjs
.throwError(() => new Error('asd'))
.subscribe({
error: (err) => o.error(err),
});
console.log('post subscribe');
});
return () => {
console.log('disposer');
};
}).subscribe(() => {});
/* Logs
pre subscribe
task
disposer
post subscribe
*/
Would it make sense to ensure e.g. throw error occurs asynchronously?
In case no one else is working on this issue, is it fine if I work on it? Can this be assigned to me?
Describe the bug
We implement an operator called
combineMap
which is a more efficient variant of the following pattern:Instead, we can do the following which will re-use the results and subscriptions for unchanged values:
This works fine most of the time, but sometimes in production we get the following error:
Give the following code:
For some reason the disposer:
Runs before
subscription
has been assigned above:I have no idea how this can occur and have been unable to reproduce it.
Expected behavior
The outer disposer does not run concurrently with the inner subscription's
next
invocation.Reproduction code
Unable to reproduce outside of production code.
Reproduction URL
No response
Version
7.8.0
Environment
No response
Additional context
No response