Open ehaskins opened 4 years ago
In case it's useful to someone, here's my stripped down version of delay which simply delays each notification.
import { async } from 'rxjs/scheduler/async';
import { Operator } from 'rxjs/Operator';
import { Subscriber } from 'rxjs/Subscriber';
import { Notification } from 'rxjs/Notification';
import { Observable } from 'rxjs/Observable';
import { SchedulerLike, MonoTypeOperatorFunction, TeardownLogic } from 'rxjs';
/**
* Delays the emission of items from the source Observable by a given timeout.
*
* <span class="informal">Time shifts each item by some specified amount of
* milliseconds.</span>
*
* ![](delay.png)
*
* If the delay argument is a Number, this operator time shifts the source
* Observable by that amount of time expressed in milliseconds. The relative
* time intervals between the values are preserved.
*
* ## Examples
* Delay each click by one second
* ```ts
* import { fromEvent } from 'rxjs';
* import { delay } from 'rxjs/operators';
*
* const clicks = fromEvent(document, 'click');
* const delayedClicks = clicks.pipe(delay(1000)); // each click emitted after 1 second
* delayedClicks.subscribe(x => console.log(x));
* ```
*
* @see {@link debounceTime}
* @see {@link delayWhen}
*
* @param {number} delay The delay duration in milliseconds (a `number`)
* @param {SchedulerLike} [scheduler=async] The {@link SchedulerLike} to use for
* managing the timers that handle the time-shift for each item.
* @return {Observable} An Observable that delays the emissions of the source
* Observable by the specified timeout or Date.
* @method delay
* @owner Observable
*/
export function delay<T>(
delay: number,
scheduler: SchedulerLike = async
): MonoTypeOperatorFunction<T> {
const delayFor = Math.abs(<number>delay);
return (source: Observable<T>) =>
source.lift(new DelayOperator(delayFor, scheduler));
}
class DelayOperator<T> implements Operator<T, T> {
constructor(private delay: number, private scheduler: SchedulerLike) {}
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(
new DelaySubscriber(subscriber, this.delay, this.scheduler)
);
}
}
/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class DelaySubscriber<T> extends Subscriber<T> {
private errored: boolean = false;
constructor(
destination: Subscriber<T>,
private delay: number,
private scheduler: SchedulerLike
) {
super(destination);
}
private scheduleNotification(notification: Notification<T>): void {
if (this.errored === true) {
return;
}
this.scheduler.schedule(() => {
if (this.errored) {
return;
}
notification.observe(this.destination);
}, this.delay);
}
protected _next(value: T) {
this.scheduleNotification(Notification.createNext(value));
}
protected _error(err: any) {
this.errored = true;
if (this.destination.error) {
this.destination.error(err);
}
this.unsubscribe();
}
protected _complete() {
this.scheduleNotification(Notification.createComplete());
this.unsubscribe();
}
}
The usage that this problem refers to is unclear. I don't see how the snippet under "Reproduction" would effect the error. Does a date need to be passed for the error to be effected? And your stripped down delay
just seem to have removed the Date
check. If that's the case, how does it address the issue in the "Reproduction" snippet.
I've expended my original post to include steps for the reproduction and an analysis section.
My simplified version remove all use of absolute times, and simply uses this.scheduler.schedule(()=>{...}, this.delay}
to delay the notifications, since the issue is in the calculation of subsequent delays my solution bypasses it. But, at the expense of loosing the delay until date functionality which I don't use.
I ran into this in the context of Node running on an IoT device that doesn't have a real time clock, and is prone to wild changes in system time as it connects to other devices. This is the extreme case where the computer's time was moved back in time significantly, but any time change causes smaller versions of the issue.
If the computer's time were to move forward, emissions would be clumped together.
Time Change x
Source 1 2 3 4 5 6
Dest 1 2 345 6
And, if the computer's time were to move backwards, emissions would stall for a period of time as 4
the first one stuck in the queue prevented other emissions, follow by a burst as the subsequent emissions catch up.
Time Change x
Source 1 2 3 4 5 6
Dest 1 2 3 456
I suspect the implementation is the way it is for efficiency reasons. I'd have to spend more time looking at it and thinking about it, but I don't have the time for that ATM.
I'm disinclined to label this as a bug because I think your circumstances are ... atypical. My concern is that changing the implementation to rely upon the scheduler - for delaying each notification individually - would change the behaviour and potentially break other users (e.g. notifications batched into the same turn through the event loop might end up spread across several turns).
Happy for this to stay open, though. And thanks for the clarifying information.
Bug Report
Current Behavior
If a delay is running while the system's clock is changed it affects the delay of notifications.
In the extreme case where the system's date goes back in time by more than 2^31 ms (~25 days) it gets stuck in a fast loop printing the message below ~1000 times/sec.
By overriding setInterval with:
I got the following stack trace:
Reproduction
Steps:
Expected behavior Delays between notifications should not be impacted by clock time of the system.
Environment
Possible Solution
Related to #5232.
Delay mixes logic to handle offsetting start to an absolute time, and logic to delay individual notifications. Does it make sense to separate the two features into separate operators?
Analysis
When the
of
emits the value the notification is added to DelaySubscriber'sthis.queue
. TheDelayMessage
includes the absolute time the notification is scheduled to be emitted, anddispatch
is scheduled for execution inthis.delay
. https://github.com/ReactiveX/rxjs/blob/41888efcd1f34de2ee1eda0bd778d4fe0ab4263e/src/internal/operators/delay.ts#L132)The scheduler's clock is then set backwards in time before
Dispatch
is executed.Dispatch
is called afterthis.delay
elapses. It emits any notifications scheduled to be emitted beforescheduler.now
.https://github.com/ReactiveX/rxjs/blob/41888efcd1f34de2ee1eda0bd778d4fe0ab4263e/src/internal/operators/delay.ts#L99
Since
scheduler.now()
is now significantly in the past,queue[0].time
is in the future, so nothing is emitted. Since the queue is not empty,dispatch
calculates the delay until the next notification to emit.https://github.com/ReactiveX/rxjs/blob/41888efcd1f34de2ee1eda0bd778d4fe0ab4263e/src/internal/operators/delay.ts#L104
The delay ultimately is passed to
setInterval
where the delay is validated to be less thanTIMEOUT_MAX
, which is2 ** 31 - 1
ms, which is less than 25 days.https://github.com/nodejs/node/blob/1fab8a92974ce555adca39baada1d199b4952fd7/lib/internal/timers.js#L160
Since
queue[0]
was scheduled "in the future" and not emitted, the delay was a large positive number significantly larger thanTIMEOUT_MAX
. Node's behavior in the case of a duration overflow is to set the duration to 1ms.Dispatch is then called 1ms later, and the process repeats since queue[0] since hasn't been emitted, and is still far enough in the future to exceed
TIMEOUT_MAX
.