ReactiveX / RxPY

ReactiveX for Python
https://rxpy.rtfd.io
MIT License
4.78k stars 362 forks source link

RxPY v3 Remaining Work #285

Closed dbrattli closed 5 years ago

dbrattli commented 5 years ago

This issue is for tracking the remaining work for RxPY v3.

Later, i.e v3.1

jcafhe commented 5 years ago

About Disposables, I must say that I'm really confused and don't understand quite well what is the rxjs way. I Thought that:

subscription = source.subscribe()
subscription.unsubscribe()

was the same as:

disposable = source.subscribe()
disposable.dispose()

With your proposal, would we still be able to 'dispose' in the sense of "hey, I don't need your data anymore"?

dbrattli commented 5 years ago

Yes, want to keep disposables even if they are just an object wrapping of a function that takes nothing and returns nothing Callable[[], None]. In a functional programming language they would just be a function. In OO they become an object instantiated from a class inheriting from a baseclass that implements in interface with a single method.

They are extremely useful for controlling the life-time of resources such as subscriptions.

But we should remove the static disposable with methods and replace with create functions e.g. rx.disposable.empty() instead of:

from rx.core import Disposable 
Disposable.empty()

Similar to what we have done for static create of Observables, e.g rx.empty() instead of Observable.empty().

jcafhe commented 5 years ago

Thank you very much for your insight !

jcafhe commented 5 years ago

Just to make a small review about the removing of result_mapper:

generate, generate_with_relative_time, zip, zip_with_iterable does no longer expose a result_mapper.

There are still operators with a result_mapper that could be removed (in addition to combine_latest and with_latest_from) but I need your opinions before doing anything.

for_in

I feel like if we remove result_mapper, nothing will differ for_in from concat. Maybe we should rename result_mapper to just mapper. We could even extend for_in with an optional predicate argument to mimic a python list comprehension:

[expression for item in items if predicate]

join/group_join

I see no problem to remove result_mapper and return a tuple for these operators.

By the way, I agree with your idea of a starmap operator :+1:

dbrattli commented 5 years ago

For for_in it's not really a result mapper since the mapping is not done on any results. It's just a mapper for the builtin Iterable map i.e

    def for_in(values, mapper) -> Observable:
    """Concatenates the observable sequences obtained by running the
    specified result mapper for each element in source.

    Args: 
        values: A list of values to turn into an observable sequence.
        mapper: A function to apply to each item in the values
            list to turn it into an observable sequence.

    Returns:
        An observable sequence from the concatenated observable
        sequences.
    """ 
    return concat(map(mapper, values))

I have removed the impl. file since it's so simple to construct and just added it to the __init__.py file so it's still available.

erikkemperman commented 5 years ago

Lazy load schedulers. We have many schedulers that are not being used. Perhaps eventloop schedulers should be imported explicitly (full path) so they don't add to startup time

Makes sense. But also: I notice that all the top-level schedulers (not within mainloopscheduler) create a singleton instance. For things like ThreadPoolScheduler and (especially) the ProcessPoolScheduler I am trying to implement, such a potentially useless instantiation is expensive (probably much more so than simply parsing the module source).

What is the purpose of these? If it is only for the unit-tests, can I suggest to remove them?

dbrattli commented 5 years ago

Several operators needs a default scheduler and will use the the singleton schedulers if no scheduler is supplied (immediate_scheduler, timeout_scheduler, current_thread_scheduler), and they must in many cases (current_thread_scheduler) use the same instance for things to work correctly. It makes sense if you dive into things, but it's really a confusing mess inherited from Rx.NET and RxJS. I would love to clean it up and reduce the number of schedulers. The main schedulers are:

If I remember correctly, only the immediate_scheduler, current_thread_scheduler, timeout_scheduler needs the singleton.

erikkemperman commented 5 years ago

All right, thanks for clearing that up!

jcafhe commented 5 years ago
  • Making Observables of type Generic[T] instead of Generic[Any]. The problem is what to do with vararg operators such as zip, combine_latest etc. Perhaps look to RxJS and see how they have solved it?

I've just checked in rxjs but I'm not familiar with javascript. For merge operator, the signature is:

export function merge<T, R>(...observables: Array<ObservableInput<any> | SchedulerLike | number>): Observable<R>

So they have a specific ObservableInput of type <Any> instead of the returned Observable<R> . It is defined in internal/types:

export type ObservableInput<T> = SubscribableOrPromise<T> | ArrayLike<T> | Iterable<T>;

Don't know if it can help.

dbrattli commented 5 years ago

Latest RxPY v3 and pipelining works like a charm with Coconut.

skjermbilde 2019-01-26 kl 08 44 13
jcafhe commented 5 years ago

I've digged into marbles to fix it but I think we need a complete rework.

What I would like is to make it compliant with ASCII diagrams as defined in rxjs (marble-testing). This could be usefull for easily adding new tests as well as generate docs (render picture as you suggested).

However, it would take me quite a lot of time but if it's ok, it could be great.

MainRo commented 5 years ago

We can use the ascii marble testing syntax for testing, but to generate diagram it lacks several features:

If this can be added on top of the existing syntax, then we would have a marble ascii syntax suitable for tests and doc.

dbrattli commented 5 years ago

I've btw fixed testing/marbles. It's not as advanced as the ASCII diagrams in RxJS, but they look slightly overengineered (imo). I like the hot and cold operators. I don't think we need to support higher order streams, and it's just a few operators generating higher orders like window and group_by. Instead we could tap the operators and inner streams for flat_map et al. and instead use a templating engine e.g jinja2 to setup the diagram the way we want.

dbrattli commented 5 years ago

Regarding the issue with Generic[T] for Observable, then I think we need to change merge, zip, combine_latest etc to only take "plain" Observable arguments i.e *args: Observable[Any] and not accept first argument as Iterable[Observable[Any]]. This is because we need to create overloads such as combine_latest(a: Observable[T1], b: Observable[T2]) -> Observable[Tuple[T1, t2]] and combine_latest(a: Observable[T1], b: Observable[T2], c: Observable[T3]) -> Observable[Tuple[T1, T2, T3]] etc. Having typed Iterables returning Observables of different types is not possible, so the Iterable would need to be Iterable[Observable[Any]]. My suggestion is that we do as RxJS and remove the support for having first argument to be Iterable[Observable].

jcafhe commented 5 years ago

I agree, from an user perspective, I prefer to write:

rx.zip(o1, o2, o3)

than

rx.zip([o1, o2, o3])

For a list of observables, we just have to add a *, so it's not that much effort:

rx.zip(*observables)
dbrattli commented 5 years ago

Yes, I suggest we remove the support now, and add it later on demand with another name e.g zip_observables or something. That way we don't need another breaking change when/if we add it.

MainRo commented 5 years ago

While working on the examples in the documentation is saw a change I was not aware of: The subscription function provided in the create operator (or Observable) now takes two parameters: the observer and a scheduler. How should a subscription implementation use this scheduler parameter ? The scheduler parameter is always provided, so my understanding is that the observer method should always be scheduled. However the scheduler can be None. Maybe things are not complete yet on that part ?

dbrattli commented 5 years ago

Yes, that is correct. This is the subscription default scheduler that you may use to set a scheduler once and for all for all operators in the chain. Check out the timeflies_tkinter.py that uses this feature. This means that when using RxPY with an UI library you don't need to set the scheduler on every operator.

For operators such as delay, it will choose the scheduler this way:

 _scheduler = scheduler or scheduler_ or timeout_scheduler

Where scheduler is the operator scheduler argument, scheduler_ is the scheduler given to subscribe (if any) and timeout_scheduler will be used if the other two are None.

For create it means that it may use the scheduler, or it may not. The scheduler may also be None if subscribe() is called without a scheduler. Create itself does not take a scheduler as an argument, but when creating custom logic it may have a scheduler (partially applied) from an outer scope (e.g fromiterable.py. Every operator is really using create(), but they currently short-cut by calling Observable directly with their subscribe function, Obserable(subscribe) is just the same as rx.create(subscribe).

If the subscribe function given to create() produces values (out of thin air) then it should probably schedule these values. If the values arrives from some other non-observable code, and create is used to wrap an observable around it to make it observable, then you should probably not schedule the values.

The confusing part is how this relates to subscribe_on(), that can be used to set the scheduler to where e.g create() run on. This is correct, but it's not possible to schedule values in the future that way, and it's not possible to schedule every value if needed. The goal of RxPY v2 was to remove operator scheduler arguments incl. subscribe_on, observe_on, but now we have both options.

MainRo commented 5 years ago

ok thanks for the clarification. I still find this parameter confusing: It has to be declared as a parameter of the subscribe function; but it's value is None unless a scheduler has been provided on subscription; and it is useful only when emitting items after the subscription call. I like using subscribe_on and observe_on because it allows to control scheduling outside of the observable factory: The factory code can just call the observer, and the user of the factory can decide where to schedule emissions with subscribe_on (for items emitted during subscription) and observe_on (for items emitted later). I suppose that depending on the structure of the code, one or the other solution is more convenient.

dbrattli commented 5 years ago

It's not more confusing than the scheduler argument to every operator that currently needs a scheduler. Those operators cannot perform their action using subscribe_on, or observe_on. It's also confusing when using e.g TkinterScheduler or IOLoopScheduler, that you need to spesify the scheduler to every operator that deals with time, and that using subscribe_on or observe_on does not help in any way, e.g for operators such dealing with time as delay(), timeout(), interval(), etc. Using subscribe_on does not control when to schedule things, only where. It just allows for the subscribe logic to run in the context of a given scheduler (where). The subscribe logic stil cannot schedule anything at a later point in time (when).

jcafhe commented 5 years ago

Remove all first arguments that accept Iterable[Observable] for all operators. Use *args instead.

We may have a problem with operators concat and catch.

Other operators rely on the fact that the first argument as an Iterable can be an Iterator with possibly infinite length, or a length that could be unknown when calling concat or catch.

*args is a tuple and cannot be mutated, so the length is set when calling the operator function.

https://github.com/ReactiveX/RxPY/blob/379b69976c8c393c24ce63542f5d811b2dd7e147/rx/core/operators/repeat.py#L29-L35

The operators that expect this behaviour are listed below:

For repeat, I've tried to re-implement (copy/paste/adapt) the iterator thing from concat to avoid a dependency to concat. It seems to work ca014b5342443db97f2f1f0ef962f43349005c12 but I don't know if this is the right thing to do.

dbrattli commented 5 years ago

The right thing to do here is to create new functions (operators) that takes Iterable[Obserable] as argument. The name should be e.g. concat_iterable or concat_with_iterable. The same can be done for merge etc. Check out how they did it in F# where overloading of functions is not possible. http://fsprojects.github.io/FSharp.Control.Reactive/reference/fsharp-control-reactive-observablemodule.html

jcafhe commented 5 years ago

good idea :+1:
I'm going to rework my PR #302 . Howerver I still have 2 questions:

jcafhe commented 5 years ago

I've started to merge core and internal to internal (mv-core-to-internal ) but it leads to multiple circular dependencies, e.g.:

(clarification: each item in the list import the next item, see stack trace below)

The same thing happens with:

The first solution is to import 'isolated' modules (basic, priorityqueue, exceptions, ...) before other imports in rx.internal.__init__.py. But relying on imports order doesn't feel sane.

Another solution is to import full path (relative or absolute) everywhere, e.g. from .internal.basic import noop instead of from .internal import noop. This doesn't feel good either because a package should be allowed to import names declared in the __init__ of an other package.

What do you think ?

EDIT: my comment is not very clear, a trace may be more appropriate:

>>> import rx.internal
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/jehf/DEV/pyprojects/RxPY/rx/__init__.py", line 6, in <module>
    from .internal import Observable, abc, typing, pipe
  File "/home/jehf/DEV/pyprojects/RxPY/rx/internal/__init__.py", line 4, in <module>
    from .observable import Observable, ConnectableObservable
  File "/home/jehf/DEV/pyprojects/RxPY/rx/internal/observable/__init__.py", line 1, in <module>
    from .observable import Observable
  File "/home/jehf/DEV/pyprojects/RxPY/rx/internal/observable/observable.py", line 5, in <module>
    from rx.disposable import Disposable
  File "/home/jehf/DEV/pyprojects/RxPY/rx/disposable/__init__.py", line 3, in <module>
    from .disposable import Disposable
  File "/home/jehf/DEV/pyprojects/RxPY/rx/disposable/disposable.py", line 3, in <module>
    from rx.internal import typing, noop
ImportError: cannot import name 'noop' from 'rx.internal' (/home/jehf/DEV/pyprojects/RxPY/rx/internal/__init__.py)
>>> 
dbrattli commented 5 years ago

Closing this as I don't see anything more that needs to be done for a v3 release from my side. There are some refactoring that can be done but nothing blocking a release IMO. At least we should postpone any further refactoring until a v3.1 release.

lock[bot] commented 4 years ago

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.