Closed augustinr closed 4 years ago
in most cases using existing instance of queuescheduler would be sufficient, I believe. Would you mind share some details for reason of creating another instance of scheduler?
but is the following line not missing ? this.SchedulerAction = SchedulerAction;
: No, it's expected, using TypeScript allows assignment in constructor.
constructor(private SchedulerAction: typeof Action = QueueAction)
The private
keyword in the constructor tells TypeScript that SchedulerAction
is a property to be set on the class with the value of this argument.
It looks like the issue here is that QueueScheduler
, et al, and their constructors should implement a default value for the Action constructor they're using.
The Scheduling system in general is about to get a refactor, so I'll keep this one in mind.
Here is a use-case for another scheduler:
We process streams of events from multiple physical devices. The events are timestamped by the device indicating its occurrence. We use a lot of time-interval based operators to reason over the events. eg, "has device-a been in state-b for 30 seconds, during that time device-b did not emit any event-X events". The 30 seconds here is not the CPU clock that all the existing non-test Rx schedulers are based on, but a pseudo clock that is advanced based on the external event timestamps.
It might also be important to point out that the events do not all arrive in a smooth temporal stream; Some come in batches, some come out of order. We use a input buffer over a configurable window to reorder the events before they are pushed into the main pipeline of Rx operators.
This feature of advancing the clock based on external events rather the CPU is not uncommon in general purpose CEP systems. Esper for example supports explicitly advancing the clock based on a pluggable algorithm (it does this by accepting time-advance events).
It is possible to implement this capability without using a event-based clock. We could have built custom operators corresponding the all the Rx time operators that are aware of the timestamp property events and use that internally to represent the advance of time. But this would essentially just be implementing another form of external clock, but at the cost of way way more effort and no reusability of the existing the Rx clock framework.
The Rx clock abstraction is quit nice and very flexible. But it also really needs to be open to extension.
I hope I made the argument. If not, please ask for more clarification.
@memelet at that point, wouldn't an abstraction like a VirtualTimeScheduler
or HistoricalScheduler
work for you that we had back in RxJS v4? Not everything was ported mind you to the newer stuff but it was awfully useful for those clock scenarios you are talking about.
@mattpodwysocki Yes, if I remember (but I would have to check) we extended VirtualTimeScheduler in 4.x. For 5.x it's a bit more messy. We extend QueueScheduler
and have to extend QueueAction
.
From beta-10 or so to now, our implementation no longer works. I'll let someone else from team that understands this better describe the situation...
Like @memelet said, the issue is with extending QueueScheduler.
As @blesh has stated above, "It looks like the issue here is that QueueScheduler, et al, and their constructors should implement a default value for the Action constructor they're using."
issue should be resolved when that is fixed.
IIRC the thing we've been calling the VirtualTimeScheduler was hastily implemented to support writing tests as marble diagrams, but nobody's had the time (heh) to carry it to the finish line.
I was hoping at some point @mattpodwysocki would want to collaborate on the remaining Scheduler features :-). Though seeing as how I've mostly failed to communicate the requirements and designs of the new Schedulers in detail, I'll take the blame for dropping the ball here. So in the interest of at least getting this info down somewhere, the rest of this post will be all the what and why details I can remember about the new Schedulers. @jayphelps hopefully this will also help you if you're on Scheduler maintenance duty.
A Scheduler is a sorted queue of things to do. Just like all the other versions of Rx, it schedules each thing to do as a disposable Action, so an Action extends Subscription (aka Disposable). The Scheduler constructors accept an Action class. When you call schedule()
, the Scheduler creates and returns you an instance of this class.
To schedule something to happen in the future, we started with the simplest API we could (but no simpler), initially only supporting relative due times:
const action = Scheduler.schedule(
function work(this: Action<T>, state?: any) {
/* do some stuff now */
state.count += 1;
if (state.count < 10) {
/* do this again later */
this.schedule(
state /* <-- next state */ ,
200 /* <-- next due time (or period) */
);
}
},
100 /* <-- initial due time */,
{ count: 0 } /* <-- initial state */
);
/* some time later... */
action.unsubscribe();
We identified disposable and closure allocation, closure invocation, and async request call patterns as the major performance bottlenecks, but we wanted to keep the familiar recursive scheduling API and reentrancy behaviors. Thus, the Schedulers were factored into two separate components, for performance and extensibility:
The Action instance is responsible for updating the shared queue of the Scheduler it was created by. An Action is responsible for insertion, removal, ordering, and flushing its Scheduler's queue. This may initially seem counter-intuitive; we wouldn't typically allow a queue to be updated by the things that go in it. However, this approach allowed us to make a number of key performance improvements while keeping the core, familiar recursive scheduling API:
Observable.interval(0)
, no matter how many periods elapse. This can dramatically reduce memory pressure/GC cost in scenarios with many concurrent periodically scheduled Actions. VirtualActions (created by the VTScheduler) are immutable, because they need to be inspected by the TestScheduler to ensure each execution of work does what it's supposed to.0
due time, and it will be run by its original Scheduler.setInterval
instead of serial setTimeout
calls. This is less stressful on the VM for Actions with stable periods, but also ensures singly-executed Actions are consistently invoked in order with respect to other Actions (both periodic or otherwise). The VM invokes setInterval
callbacks as close to the time they should execute, adjusting for time lost in long frames under load, providing a more accurate picture of "real" time.When it's time to flush a Scheduler's queue, the Scheduler executes each Action, handling errors and reentrancy. For example, the Async and Queue Schedulers guard against recursion by pushing rescheduled actions to the end of the queue and unwinding the stack. This is identical to previous versions of Rx. By default, the production Schedulers stop executing if an Action returns an Error, and disposes the rest of the scheduled Actions. There's probably room to experiment here.
Early on we made a decision to switch the default scheduling semantics from trampolining (aka CurrentThreadScheduler) to recursive (aka RecursiveScheduler). However instead of implementing a new RecursiveScheduler, RxJava proved that defaulting to iterative loops in the absence of a Scheduler yields identical call patterns, and is dramatically faster in most cases.
So to finish the VTScheduler, we need to add a public API for start, stop, sleep, sorting and comparing time, scheduling by absolute time, and advancing by relative and absolute time (instead of count). @mattpodwysocki did I miss anything?
@trxcllnt yeah, I think we should pair program on the VirtualTimeScheduler
stuff because I think there is a lot of value to it to have a more complete swappable clock. I think you captured the essence in that we need the basics.
The scheduler itself should be initialized with an initial clock value, as well as a comparer for values to determine whether we need to increment the clock. We made the virtual time scheduler overly generic and I'm not sure it's super necessary to have all the ceremony we had before.
The following methods should be supported:
sleep
- Advance the clock without running any of the scheduled workadvanceTo
- Advance the clock to an absolute time and run all scheduled workadvanceBy
- Advance the clock by a relative time and run all scheduled workschedule
-Obviously to schedule the workstart
- Starts the schedulerstop
- Stops the schedulerWhy don't we get started on this, so we could add some nicer stuff for historical scheduling as well?
@mattpodwysocki yes! happy to be moving forward on this. just pushed a virtual-time-schedulers
branch for us to collaborate.
Yay! I ran into this while writing RxJS in Action and was sad that VirtualTimeScheduler
wasn't really usable in the same way it was in RxJS 4. @trxcllnt I'd be interested in helping out on this as well.
@trxcllnt and @mattpodwysocki would it be worth collapsing TestScheduler
and VirtualTimeScheduler
into a single implementation? I feel like they're unnecessarily separated, and I'm the one who implemented them.
Also I think we should be very cognizant of any size we're adding to the library, in particular any size we're adding to more commonly used Schedulers (queue or asap, for example), just as "collateral damage" from an update to a generic API. Do you know what I mean?
@blesh the ideal scenario in this case would be that the TestScheduler
is still an implementation of the VirtualTimeScheduler
. As noted in RxJS v4, there isn't much to the core of the TestScheduler
outside of scheduling the creation, subscription and disposal and the majority of the heavy lifting was done in the VirtualTimeScheduler
.
@blesh We can collapse ASAP and AnimationFrame Schedulers into one type, as they're identical. We can also collapse the ASAP and AnimationFrame Actions into one type that takes a "requestNextFrame" method in its constructor. The only reason they're distinct is so last major scheduler PR diff didn't scare anybody.
As @mattpodwysocki said, the VT Scheduler should be possible to implement without adding any weight to the production Schedulers. We should be able to add absolute time support to the base AsyncAction for production, and override the behavior in the VTAction.
The current TestScheduler is really just for parsing and executing marble diagram tests, so I'd recommend we should keep it distinct, or maybe rename it to something like MarbleTestScheduler
.
There has not been much info available on the scheduler aspects of RxJS 5 so I'm glad to learn about this new implementation. From a music processing perspective scheduling is the essence of good playback.
Music data is really just a slightly more complex version of marble diagrams so having a scheduler that is also suitable for advancing in musical time, queuing notes and in general handling transport tasks would be an important use case to consider.
The methods listed by @mattpodwysocki above already look like they are pretty well suitable for music processing in general. Just thought this might be the right time to chime in for the musical world at large and create a API that makes musical sense as well 🎵
This appears to have been resolved a long time ago.
RxJS version: 5.0.0-beta11
Actual behavior: I tried to use the beta 11 today in the project I'm working on, and the QueueSchduler schedule method raises the following error: TypeError: this.SchedulerAction is not a constructor.
Code to reproduce: Actually it seems that the SchedulerAction is not defined. The error is raised by running the schedule method of an instance we create:
Additional information: By reading the Scheduler class, I found out the constructor does not set the SchedulerAction attributes:
I may missed something, but is the following line not missing ?
Also, the QueueScheduler class could reuse the Scheduler constructor with QueueAction by default ?
Im also wondering if I'm doing wrong when instantiating a QueueScheduler, maybe only use Scheduler.queue could be sufficient...
Thanks for this great repo !
++
Augustin