ReactiveX / RxGo

Reactive Extensions for the Go language.
MIT License
4.96k stars 338 forks source link

Adding Subjects #407

Open chfischerx opened 10 months ago

chfischerx commented 10 months ago

Is your feature request related to a problem? Please describe. The ReactiveX "standard" defines various types of subjects to allow multiple subscribers observe an observable (see https://reactivex.io/documentation/subject.html). While the same can be achieved with a connectable Observable, there are some missing features. Most important, dynamically adding and removing subscribers. A RxGo connectable Observable allows adding multiple subscribers, and each will receive a copy of all items. But once added, subscribers cannot be removed (or unsubscribe).

Subjects are useful when trying to emulate an in-memory message broker. A typical example is creating a fan-out of a single data source to multiple clients within a web server using e.g. WebSockets. Every new WS connection will be added as a Subject subscriber. Once the client disconnects the subscription must be removed.

Describe the solution you'd like I suggest three Subject types:

Suggested Subject API:

type ISubject interface {
    Subscribe() (Subscription, rxgo.Observable)
    Next(value any)
    Error(err error)
    Complete()
}

And the Subscription API:

type Subscription interface {
    Unsubscribe()
}

Additional context I already created all of the above in a private repo based on RxGo Observables. I would be glad to contribute (and of course maintain) my code into this project if there is interest.