DelSkayn / rquickjs

High level bindings to the quickjs javascript engine
MIT License
504 stars 63 forks source link

Detect when the task queue is empty? #137

Closed stevefan1999-personal closed 1 year ago

stevefan1999-personal commented 1 year ago

So NodeJS will shut itself down when there is nothing in the task queue, and I wonder how do I replicate this in Rust with futures feature on. Right now it seems like the executor task would like to run forever if I selected it, but if I don't select it in my run loop and let the Tokio runtime die before then it will panic due to SendError. I want to detect when the queue is empty so I can shut down the runtime.

katyo commented 1 year ago

@stevefan1999-personal See Runtime::idle

stevefan1999-personal commented 1 year ago

@katyo Well the problem is that if the idle time can happen between timer task...that's my observation

stevefan1999-personal commented 1 year ago

https://github.com/DelSkayn/rquickjs/blob/234cc91ff7a3d06fb5d58fcd8f0e6bed56af51d5/core/src/runtime/async_executor.rs#L69

I once turned on debugger and see that after the timer tasks are done, this is hit and the select loop is never triggered again for some reason.

Here's my script for testing it:

const timer = setInterval(() => {
    console.log("hello world")
}, 1000)

setTimeout(() => clearInterval(timer), 5000)

export const hello = "world"

POC implementations (well it is actual code from my project but is concept code here anyway):

    #[quickjs(rename = "setInterval")]
    pub fn set_interval(
        func: Persistent<Function<'static>>,
        delay: Option<usize>,
        ctx: Ctx,
    ) -> CancellationTokenWrapper {
        let delay = delay.unwrap_or(0) as u64;
        let duration = Duration::from_millis(delay);
        let mut interval = time::interval(duration);
        let token = WORLD_END.child_token();

        ctx.spawn({
            let token = token.clone();
            let context = Context::from_ctx(ctx).unwrap();
            tokio::spawn(async move {
                // ignore the first tick
                let _ = interval.tick().with_cancellation(&token).await;
                while let Ok(_) = interval
                    .tick()
                    .with_cancellation(&token)
                    .await
                    .map(|_| context.with(|ctx| func.clone().restore(ctx)?.defer_call(())))
                {
                }
            })
        });

        token.into()
    }

    #[quickjs(rename = "clearInterval")]
    pub fn clear_interval(token: CancellationTokenWrapper) {
        token.cancel();
    }

    #[quickjs(rename = "setTimeout")]
    pub fn set_timeout(
        func: Persistent<Function<'static>>,
        delay: Option<usize>,
        ctx: Ctx,
    ) -> CancellationTokenWrapper {
        let delay = delay.unwrap_or(0) as u64;
        let duration = Duration::from_millis(delay);
        let token = WORLD_END.child_token();
        ctx.spawn({
            let token = token.clone();
            let context = Context::from_ctx(ctx).unwrap();
            tokio::spawn(async move {
                let _ = time::sleep(duration)
                    .with_cancellation(&token)
                    .await
                    .map(|_| context.with(|ctx| func.restore(ctx)?.defer_call(())));
            })
        });
        token.into()
    }
    pub async fn run_until_end(&mut self) {
        let cancel = WORLD_END.child_token();
        let engine = self.engine.clone();
        let rt = engine.ctx.runtime();
        let mut guard = engine.executor_handle.lock().await;
        // let mut first_task = true;
        // let mut is_empty = false;

        let mut zeroes = 0;
        let mut stoppable = false;
        let mut idle = rt.idle();
        // let _ = guard.deref_mut().await;
        'select: loop {
            select! {
                // biased;
                _ = cancel.cancelled(), if !stoppable => {
                    println!("test1");
                    stoppable = true
                },
                None = self.tasks.join_next(), if !stoppable => {
                    println!("test2");
                    stoppable = true
                },
                _ = guard.deref_mut() => {
                    stoppable = true
                },
                _ = rt.idle(), if !stoppable || rt.is_job_pending() || !rt.tasks_empty() => {}
                else => {
                    break 'select;
                }
            }
            yield_now().await;
        }

        // let _ = guard.deref_mut().await;
        // println!("wait");
        // rt.idle().await;
    }
stevefan1999-personal commented 1 year ago

Maybe the biggest problem is that the async executor itself did not cater for proactor based execution flow, it cannot reliably return empty message back to the async controller, hence the tasks are going to be pending forever. Maybe we will need some mechanism to detect that. I tried sending an empty queue event with another channel but not sure how to do that in poll context (if that is safe or not)

I got this problem because I see that the timer tasks are done but the executor tasks are still idling which I know makes sense but it will also make Runtime::idle forever blocking right?

image

stevefan1999-personal commented 1 year ago

@katyo I think I found the problem. Since the idle variable was set to true initially, which means the first time I polled it, then the idle poll would still have been immediately triggered, causing the JS engine to terminate prematurely and stopped the runtime, hence why there is a SendError in flume because it indeed no longer exists. 

Besides, do we have a way to guarantee that all the tasks are truly finished in its lifetime? Because right now, we can only guarantee that the task runnable (or task runner) is being retrieved and ran at one point, until it reaches a point there is an await in the subsequent future under the executing task runnable (using async timer as an example, it means awaiting for sleep), and the future was sent back to the executor for polling once again. However at this point the task queue is seeming empty but there still be running tasks in the middle, yet we treated it now as idle which is also kind of contradictory. I observed this when implementing my own setInterval and setTimeout using async Tokio and I think we may need to rethink about the executor a little.

DelSkayn commented 1 year ago

This issue should be resolved now that futures where redesigned. AsyncRuntime::idle should only return after all futures and promises are finished.

stevefan1999-personal commented 1 year ago

@DelSkayn I wonder if there is an tutorial for the new future design?