ReactiveX / RxScala

RxScala – Reactive Extensions for Scala – a library for composing asynchronous and event-based programs using observable sequences
Apache License 2.0
888 stars 121 forks source link

Implement SyncOnSubscribe and AsyncOnSubscribe #220

Closed dhoepelman closed 7 years ago

dhoepelman commented 7 years ago

See #219 for some discussion.

I implemented both option 2 (as createX) and 3 (as applyX), as 2 has some major annoyances when used from scala: type interferencing can't determine the T and the user requires at least 2 statements because you need to call the observer (you can't use a single expression). The effects of this can be seen next to each other in the tests.

However, this causes duplicate very-similar functionality. If we want to avoid this I suggest we drop the java style createX methods and rename the applyX methods to createX.

samuelgruetter commented 7 years ago

I didn't have time yet for a full review, but maybe let's first discuss some desirable properties of an (A)SyncOnSubscribe solution for RxScala. I have the following in mind:

  1. Should be compatible with future deprecation of Observable.create, see https://github.com/ReactiveX/RxJava/pull/4253
  2. Should be reasonably close to RxJava, so that documentation, blog posts, forums etc on RxJava are also applicable to RxScala
  3. The "right way" of constructing Observables (i.e. through (A)SyncOnSubscribe instead of home-made usage of Observable.create) should be easily discoverable by browsing the method list of Observable. In RxJava, this was achieved by adding unnecessary overloads of Observable.create, see https://github.com/ReactiveX/RxJava/pull/3738
  4. In Scala code, if I write MyObject(args) (which is desugared into MyObject.apply(args)), I should get back something of type MyObject.

This PR does well on 1) and 2), but not so well on 3) and 4). Regarding 4), I write AsyncOnSubscribe(...), but I get back an Observable, that's very confusing.

I don't have a solution for 3) and 4) yet, but will keep thinking about it... @dhoepelman @zsxwing and others, please let me know what you think, and if you have any ideas.

dhoepelman commented 7 years ago

4 is a good point, I'll fix that.

I would add a requirement 5: "It should feel natural so use from a Scala context". SyncOnSubscribe.createStateful(() => state)((state, obs: Observer[_ <: Value]) => {obs.onNext(value); modify(state)}) fails that, while SyncOnSubscribe.applyStateful(() => state)(state => (Notification.OnNext(value), state)) doesn't or less so. It is somewhat at odds with 2.

For 3 We could add the two (currently misnomered) (A)SyncOnSubscribe.apply methods to the Observable object as Observable.(a)syncOnSubscribe.

We then have to think about whether we still want to provide the createStateless/createSingleState utility methods either on the Observable object, an SyncOnSubscribe object or leave them out altogether*. I think it's better leaving them all out together over creating six new methods on the Observable object. Providing the methods on a separate object should be okay, but hurts discoverability.

* We could assume a Scala programmer is sufficiently familiar with similar state concepts through things like state monads, Observable#scan or Traversable#foldLeft to ad-hoc construct them

samuelgruetter commented 7 years ago

I agree with 5).

A note regarding wildcards in type parameters: All RxScala types have declaration site variance annotations (+ or -), e.g. Observer[-T] or Observable[+T]. So, when you write Observable[T], it always means Observable[_ <: T], so you don't need to write the wildcard. (This applies to your comment above as well as to some lines in the code of this PR).

Here's an idea for 3):

Like this, discoverability should be good: Users browse for Observable.create, then ask "how can I get an (A)SyncOnSubscribe", go to (A)SyncOnSubscribe, and find the constructors. Note that since users don't interact with (A)SyncOnSubscribe directly, we don't need to wrap the Java object in a Scala object, but can just use a type alias. And moreover, constructing an Observable in Scala will look the same as in Java: Just something like, for instance, Observable.create(SyncOnSubscribe.createStateless(...)).

WDYT?

dhoepelman commented 7 years ago

@samuelgruetter Missed your last comment, that sounds like a good solution. I'll change the PR to fit it somewhere in the following days and incorporate the other comments.

dhoepelman commented 7 years ago

@samuelgruetter Updated the PR. Also added map to Notification, let me know if you agree.

I'm not entirely sure I got the variance right, you talked about AsyncOnSubscribe[S, -T] but shouldn't T be covariant similar to how it is in Observable[+T]?

Only way I could make it work was with: type SyncOnSubscribe[S, +T] = rx.observables.SyncOnSubscribe[S, _ <: T]

dhoepelman commented 7 years ago

@samuelgruetter I incorporated your comments and altered the RxDemo file

I rewrote all of the create examples to use (A)SyncOnSubscribe, this seems correct no? Since rx.Observable.create(OnSubscribe) is slated to be deprecated (at least in the public API) it would seem logical for RxScala to move that way too.

I'm not entirely sure about createExampleWithBackpressure, but it seems to me (A)SyncOnSubscribe provides a way more user-friendly way to achieve the same.

zsxwing commented 7 years ago

LGTM2.

zsxwing commented 7 years ago

Thanks!