ReactiveX / rxjs

A reactive programming library for JavaScript
https://rxjs.dev
Apache License 2.0
30.78k stars 3k forks source link

Observable created in another package does not work with combineLatest #5409

Closed lars-berger closed 4 years ago

lars-berger commented 4 years ago

RxJS version:

6.5.5

Code to reproduce:

Reproduction repo - Minimal Angular app where an observable is created in a library added as a dependency to the Angular app.

In the AppComponent.ts, the following line throws an error:

combineLatest([matchingNode$]).subscribe((result) => console.log('Success!', result));

Expected behavior:

For combineLatest to accept the passed in observable.

Actual behavior:

I'm running into the error TypeError: You provided 'undefined' where a stream was expected when using the combineLatest/forkJoin operator on an observable from a library added as a dependency.

Additional information:

From a (possibly) related discussion here

For example, I just ran into this problem with rxjs. That library checks the arguments to combineLatest to check if they are instanceof Observable. However, definitions of Observable from distinct copies of the package, even if byte-for-byte identical, are not considered "the same". An instance created with one of them will not be an instanceof the other one. The result is that an Observable created in the app A, when passed a utility in library L which uses combineLatest, will fail, because it is not considered an observable in the sense that L (L's copy of rxjs) understands it.

Although in the reproduction repo, the Angular app and the library share the same RxJS version.

Is there a workaround for handling this?? I'm truly confused about what's happening here - and admittedly, a bit out of my depth.

cartant commented 4 years ago

Related: https://github.com/ReactiveX/rxjs/pull/5239

Workaround is to wrap the observable from the other lib in from:

combineLatest(from(libobs), /* ... */);
lars-berger commented 4 years ago

The from then throws the same error TypeError: You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.

I noticed that checking the instanceof of the observable from the other library against Observable does not return true.

// TreeNode class is defined in another package.
const node1 = new TreeNode('node1');
const node2 = new TreeNode('node2');
node1.addChild(node2);

const matchingNode$ = node2.ancestors$.pipe(first((node: TreeNode) => node.name === 'node1'));

const testObservable$ = of('hello');

console.log(`Is obserable? ${testObservable$ instanceof Observable}`); // logs true
console.log(`Is obserable? ${matchingNode$ instanceof Observable}`); // logs false

// The error is thrown here (with and without the from).
combineLatest(from(matchingNode$)).subscribe((result) => console.log('Success!', result));

I can try out your PR if you'd like?

cartant commented 4 years ago

IDK what sort of shenanigans is involved with electron. Interop happens with Symbol.observable. Read from this issue's comment onwards: https://github.com/ReactiveX/rxjs/issues/4532#issuecomment-619343464 and read the blog post to which I linked.

kwonoj commented 4 years ago

There's no major diffs when it comes to electron unless ipc is being involved.

lars-berger commented 4 years ago

I was under the same impression @kwonoj. The reproduction repo is this angular electron boilerplate + yarn workspaces.

Added a second, simpler reproduction of the error to the repo which consists of:

// @my-app/electron, AppComponent.ts
import { simpleExportedObservable$ } from '@my-app/models';

@Component({ ... })
export class AppComponent {
  ngOnInit() {
    // The combineLatest here throws an error.
    combineLatest([simpleExportedObservable$]).subscribe((result) => console.log('Success!', result));
  }
}
// @my-app/models, index.ts
import { of } from 'rxjs'; // same version of rxjs as in @my-app/electron

export const simpleExportedObservable$ = of(1);

If I log the simpleExportedObservable$ observable it has the Symbol.observable property and it seems like just a regular observable

I have absolutely no idea whats going on here.

kwonoj commented 4 years ago

as shared above, check if both symbols are identical. if not, something around bundling & module resolution fails.

lars-berger commented 4 years ago

I think I've found the underlying issue though, but I'm not sure how to go about fixing it.

The error TypeError: You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable is thrown if this condition is false

// /internal/util/subscribeTo.js
if (!!result && typeof result[Symbol_observable] === 'function') {
    return subscribeToObservable(result);
}
// ...
else {
    var value = isObject(result) ? 'an invalid object' : "'" + result + "'";
    var msg = "You provided " + value + " where a stream was expected."
        + ' You can provide an Observable, Promise, Array, or Iterable.';
    throw new TypeError(msg);
}

Where Symbol_observable is defined as the result of this function:

(function () { return typeof Symbol === 'function' && Symbol.observable || '@@observable'; })();

The difference between the imported observable and the one created inside the angular app is that one has a property Symbol.observable and the other has @@observable.

And if I import and log Symbol_observable from rxjs internals, it's '@@observable', which means the imported observable would throw an error because of the if statement above. So I've got that figured out at least

Is there a reason why the if condition in subscribeTo.js doesn't allow both Symbol.observable and @@observable? Also is there a way for me to force Symbol_observable to be defined as @@observable in the module I'm importing?

EDIT: It's working!! I highly highly doubt this is the recommended way of dealing with this, so if there is a better alternative please let me know. Added the following to the polyfills file for angular:

import Symbol_observable from 'symbol-observable';

if (!window[Symbol_observable]) {
  window[Symbol_observable] = () => {};
}
cartant commented 4 years ago

I think I've found the underlying issue though, but I'm not sure how to go about fixing it.

If you are relying upon RxJS interop, the application should define Symbol.observable itself and should do so as early as possible - i.e. before any libs are loaded. Then, each lib should see the Symbol.observable that the application has defined and should get along fine.

Something like this:

Symbol.observable = Symbol.for("observable");

And define it in each realm if you have multiple realms.

lars-berger commented 4 years ago

@cartant That's working great, thank you! I really appreciate the help.

I'll go ahead and close this 👍