ReactiveX / RxScala

RxScala – Reactive Extensions for Scala – a library for composing asynchronous and event-based programs using observable sequences
Apache License 2.0
888 stars 121 forks source link

(A)SyncOnSubscribe #219

Closed dhoepelman closed 7 years ago

dhoepelman commented 7 years ago

Hi,

are there any plans on implementing RxJava 1.2's AsyncOnSubscribe and SyncOnSubscribe? I could contribute an implementation, but am unsure about the preferred signature.

// 1: Java style, thin wrapper but un-idiomatic
Observable(new SyncOnSubscribe[State, T](
    override def generateState: () => State
    // User must ensure Observer gets exactly one call to onNext, onCompleted or onError like in RxJava
    override def next: (Observer[_ <: T], State) => State
    override def onUnsubscribe: State => ()
))
// 2: User is responsible for correctly calling observer, similar to RxJava
object SyncOnSubscribe {
  def apply[State, T](
    generateState: () => State,
    next: (State, Observer[_ <: T]) => State,
    onUnsubscribe: State => () = _ => ()
  )
}

object AsyncOnSubscribe {
  def apply[State, T](
    generateState: () => State,
    next: (State, Long, Observer[Observable[_ <: T]]) => State,
    onUnsubscribe: State => () = _ => ()
  )
}
// 3: User returns one of 3 valid values from an ADT
// Seems cleanest, safest and most convenient for the user, but worth the overhead and divergence from RxJava?

// 3a: our own ADT
sealed trait ObserverValue[T] // Could use a better name
object ObserverValue {
    case class Next[T](value: T) extends ObserverValue
    case object Completed extends ObserverValue
    case object Error(e: Throwable) extends ObserverValue
}

// 3b: We could also implement this using standard scala types. The type would not be named in the real implementation
type ObserverValue[T] = Try[Option[T]]
// Failure(e) => onError(e), Success(None) => onCompleted, Success(Some(t)) => onNext(t)

// 3c: We could also use an observable, and only pick it's first element (or completed/error)
type ObserverValue[T] = Observable[T]

object SyncOnSubscribe {
  def apply[State, T](
    generateState: () => State,
    next: State => (ObserverValue[T], State),
    onUnsubscribe: State => () = _ => ()
  )
}

object AsyncOnSubscribe {
  def apply[State, T](
    generateState: () => State,
    next: (State, Long) => (Observable[T], State),
    onUnsubscribe: State => () = _ => ()
  )
}

I'm leaning towards 3b, but see good arguments for 2 and all 3's.

samuelgruetter commented 7 years ago

Hi, cool that you're willing to tackle (A)SyncOnSubscribe :smiley:

Approach 3a is nice, but instead of creating a new ObserverValue[T] class, we should use the already existing Notification[T].

However, I'm not sure if it's a good idea to diverge so much from RxJava, because this API is already quite complex to understand, and it would be very beneficial if the documentation, blog posts etc about RxJava are directly applicable to RxScala as well. So overall, I'd prefer approach 2.

Regarding the constructor name on object (A)SyncOnSubscribe: I would not just call it apply, but use the same three names as in RxJava (createSingleState, createStateful, createStateless). And I like your suggestion to use a default argument for onUnsubscribe :smiley:

dhoepelman commented 7 years ago

Thanks for the input!

I didn't know of Notification[T], that makes more sense. I'll port the methods you mentioned and maybe create an extra one, since State => (Notification[T], State) feel more FP/Typesafe

samuelgruetter commented 7 years ago

Right, these two approaches can coexist. Just make sure there are no method overloading ambiguities (like this one, for instance).

dhoepelman commented 7 years ago

Fixed with #220