ReactiveX / RxGo

Reactive Extensions for the Go language.
MIT License
4.96k stars 338 forks source link

Version 3 #377

Closed si3nloong closed 6 months ago

si3nloong commented 2 years ago

This is a draft PR for the upcoming release version v3, it will break and redesign the whole reactive API using the generics feature offered by Go 1.18. Technically, this PR will resolve issues #375 #250 #343, as well as some missing features such as #362 #347.

⚠️⚠️⚠️ This is still under heavy development, PR is just to keep track of the overall progression.

I'm trying to map the rxjs API design to this lib as much as possible

Alpha Version CHANGELOG :

Beta Version CHANGELOG :

Request for comments :

si3nloong commented 2 years ago

Proposal for changing observe API:

Current :

for v := range observable.Observe() {}

// OR 
obs := observable.Observe()
for {
   select {
   case item, ok :=<- obs:
   }
}

Proposed : this is highly inspired by rxjs API design

type Subscription interface {
   Unsubscribe()
}

observable[T].Subscribe(onNext func(T), onError func(error), onCompleted func()) Subscription

The current API is very confusing, users have no way to control the subscription (unsubscribe the stream when needed), and they required to handle the error or data manually. I suggested we move the abstraction into internal function, and expose only the required functions.

si3nloong commented 2 years ago

The new version of API will look like this. @teivah any comments atm? rxgo.IObservable interface will be renamed as rxgo.Observable in the future. Currently, I try to remove the old codes part by part.

Screenshot 2022-08-30 at 11 08 40 AM

Note: This stream data flow will be processed synchronously, that's why the function named as SubscribeSync

si3nloong commented 2 years ago

There are significant changes in the API. For example,

Previous

rxgo.Defer([]Producer{func(ctx context.Context, next chan<- Item) {
    next <- Of(1)
    next <- Of(2)
    next <- Error(fmt.Errorf("some error"))
}})

Current

rxgo.Defer(func() rxgo.IObservable[uint] {
   return rxgo.Interval(1000)
})

AND creating an observable is as easy as :

newObservable(func(sub Subscriber[int]) {
  for i := 0; i < 10; i++ {
    Next[int](I).Send(sub)
    time.Sleep(time.Second)
  }
  Complete[int]().Send(sub)
})
pmbanka commented 2 years ago

I took a look at this PR (massive work!), and I got curious about one thing - do you plan on porting the scheduler concept to rxgo as well? I would mainly find it useful to be able to mock/control time in my tests.

si3nloong commented 2 years ago

I took a look at this PR (massive work!), and I got curious about one thing - do you plan on porting the scheduler concept to rxgo as well? I would mainly find it useful to be able to mock/control time in my tests.

I will take a look on that after finish the general API, I think should be feasible.

si3nloong commented 1 year ago

I'm IDLE for some time due to work, will continue on this, so far the basic functionality is OK to go atm.

adhaamehab commented 1 year ago

@si3nloong hello, is this PR still active? I would love to pick it up to get a feel about maintaining this package, but don't want to over step if you have timed plan to complete it.

Olshansk commented 1 year ago

@davidlondono @si3nloong Wanted to bump the thread to see if this PR will ever get picked up again.

plastikfan commented 7 months ago

I'm interested to know if this pull request is still thing as I am in the process of defining my own V3 which uses generics, not sure about the other issues that have been identfied because I'm not sure if they will affect what I need for my own projects.