snivilised / lorax

🌟 reactive extensions for go (a pseudo rxgo version 3). Also includes 🐜 ants based worker-pool
MIT License
3 stars 0 forks source link

add rx #122

Open plastikfan opened 4 months ago

plastikfan commented 4 months ago

This is the overall issue covering adding support for RX (a stop-gap for rxgo-v3)

The following is a list of the operators to be implemented

Improvements

Fixes

Also, see this issue on rxgo: Support for generics

plastikfan commented 4 months ago

In rxgo the test 'Test_Observable_Debounce' fails:

=== RUN   Test_Observable_Debounce
    /Users/plastikfan/dev/github/go/third/rxgo/assert.go:225:
                Error Trace:    /Users/plastikfan/dev/github/go/third/rxgo/assert.go:225
                                                        /Users/plastikfan/dev/github/go/third/rxgo/observable_operator_test.go:389
                Error:          Not equal:
                                expected: []interface {}{1, 2, 5, 6}
                                actual  : []interface {}{1, 2, 5}

                                Diff:
                                --- Expected
                                +++ Actual
                                @@ -1,6 +1,5 @@
                                -([]interface {}) (len=4) {
                                +([]interface {}) (len=3) {
                                  (int) 1,
                                  (int) 2,
                                - (int) 5,
                                - (int) 6
                                + (int) 5
                                 }
                Test:           Test_Observable_Debounce
--- FAIL: Test_Observable_Debounce (0.00s)
FAIL
FAIL    github.com/reactivex/rxgo/v2    1.243s

This will not be addressed unless it affects the requirements of snivilised projects.

plastikfan commented 3 months ago

rx.Assert needs to be modified to accommodate the new ways an item can be created: Ch, Tick, Tv. Probably need some kind of generic reader function that implements this to read and interpret an existing item that can be created by one of these alternative methods. Probably need to define new assertions, ie HasCh, HasTick, HasTv.

Perhaps in iterable.go, we define a base struct iterable-X that contains a function to read each item.

plastikfan commented 3 months ago

There is a lot of option copying code; should really define a function to do this, rather than have cloned code in multiple places; eg (i *createIterable[T]) Observe in iterable-create.go.

The current definition of Item:

    Item[T any] struct {
        V T
        E error
        //
        C       chan<- Item[T]
        tick    bool
        tickV   bool
        numeric bool
        TV      int
        N       int
    }

can easily be optimised by replacing all the boolean flags with a pseudo bitwsie enum (discriminator) and because N and TV are both int, they can be replaced with a common single field, actually, this could still be called N, and we use the discriminator to discover how N should be interpreted. If the discriminiator is zero, then we know the item is just a T. We only add fields to Item, if it is of a fixed type and not of any existing type (unless multiple fields are required for a single purpose).

plastikfan commented 3 months ago

observble.go contains the runParallel function which implements a worker pool. Perhaps this can be modified to use the lorax.WorkerPool.

Actually, the above statement is incorrect. The WorkerPool is function based, where as the pool in rx is event/data oriented, so the 2 are based on 2 different concepts. However, it would be nice if there was an explicit representation of the rx pool with a clear interface rather than it being buried inside runParallel (scatter).

plastikfan commented 3 months ago

We might be better off turning Item into an interface, then having multiple value types that implement this interface, rather than the current implementation which is a struct with multiple fields.

Actually, this might not be a good idea after all, since if we introduce an interface, then:

For these reasons, we'll keep the current implementation, accepting that there is an overhead of field values for every item.

plastikfan commented 3 months ago

There is a lot of option copying code; should really define a function to do this, rather than have cloned code in multiple places; eg *(i createIterable[T]) Observe** in iterable-create.go.

The current definition of Item:

  Item[T any] struct {
      V T
      E error
      //
      C       chan<- Item[T]
      tick    bool
      tickV   bool
      numeric bool
      TV      int
      N       int
  }

can easily be optimised by replacing all the boolean flags with a pseudo bitwsie enum (discriminator) and because N and TV are both int, they can be replaced with a common single field, actually, this could still be called N, and we use the discriminator to discover how N should be interpreted. If the discriminiator is zero, then we know the item is just a T. We only add fields to Item, if it is of a fixed type and not of any existing type (unless multiple fields are required for a single purpose).

Actually, we could optimise this further. Item is now currently defined as:

    Item[T any] struct {
        V T
        E error
        //
        C    chan<- Item[T]
        N    NumVal
        B    bool
        O    any
        disc enums.ItemDiscriminator
    }

but with the intruction of Opaque/O, it becomes apparent, that there is now no need for the overhead of haing multiple fields for different purposes. All we need is a singlew Opqaue field along with the discriminator. All the other fields can be discarded, reducing the size of Item.

To support this approach, we could define all those Item creational functions as getters on Item, eg, Item[T].Of, would be the complement of the Of[T], in that it gets the value as a T from an exiting item.