mvniekerk / tokio-cron-scheduler

Schedule tasks on Tokio using cron-like annotation
Apache License 2.0
453 stars 54 forks source link

Having too many jobs causes the chanel to be filled and the scheduler to stop working #61

Open marioloko opened 5 months ago

marioloko commented 5 months ago

I have around 1000 jobs running simultaneously, the default broadcast channel size is 200, this causes the receiver side to receive a RecvError::Lagged error, and this causes the scheduler to stop working.

The offender lines are the following: https://github.com/mvniekerk/tokio-cron-scheduler/blob/main/src/job/runner.rs#L29-L32 https://github.com/mvniekerk/tokio-cron-scheduler/blob/main/src/notification/runner.rs#L28-L31

loop {
    if let Err(e) = val {
         error!("Error receiving value {:?}", e);
         break;
    }
...
}

The problem with the code above is that in case that the broadcast channel is full it will receive a RecvError::Lagged with the number of dropped messages. Then, the error will cause the cause the loop to break. This causes that the job/runner.rs thread stop running and it is never recovered anymore.

The worse thing is that the scheduler will neither shutdown the scheduler nor report any error on the scheduler.start() entrypoint, causing the scheduler to run forever in a bad state.

There are many different things that can help to solve this problem:

1. Make channel size configurable Make channel size configurable as propose in https://github.com/mvniekerk/tokio-cron-scheduler/issues/58.

I think it is a simple first step to try to fix the issue, allowing the application user to decide the capacity of the channel.

Advantages:

  1. It does not break the public interface.
  2. It is simple to implement.
  3. It can be the final solution if you know the number of jobs you will have.

Drawbacks:

  1. If you fill the channel it will still silently exit the loop.
  2. You should know the number of jobs to run beforehand.

2. Not using broadcast channels, but mpsc channels instead. The scheduler does not use several receivers in any part of the library, so the usage of mpsc is posible.

Advantages:

  1. If the channel is full, it will neither drop the messages nor send a RecvError::Lagged, but instead it will wait on the sender side until new space is available.
  2. It will keep the order of the jobs.
  3. It will only give an error if the channel is dropped, which should not occur, so the problem of exiting the loop should be also fixed.

Drawbacks:

  1. It breaks the Context public interface, as Context will not allow subscriptions to the job_activation_tx nor to the notify_tx anymore. These two channels will become part of the private interface.
  2. It can make harder to extend the application in the future, if two receivers are required for any of these two channels.

However, I do not think that the second drawback is a big deal.

I used this approach as a solution for my personal use case of running 1000 jobs and it seems to work fairly well. You can check the changes in my fork of your repository. If you like this solution I will gladly open a pull request.

3. Using busy waiting on the sender side (alternative to 2) As an alternative to solution 2, if we do not want to break Context public interface, and we still want to use broadcast channels but avoiding the lagged errors, it is possible to control if the channel has enough size on the sender size before performing a send, or sleep if it does not have space.

Something like this:

while broadcast_tx.len() >= CHANNEL_CAPACITY {
  tokio::time::sleep(Duration::from_millis(100)).await;
}
if let Err(err) = broadcast_tx.send(msg) {
...
}

Advantages:

  1. Keep the context public interface.
  2. Continue using broadcast channels that allow to easily extend the application in the future.

Drawbacks:

  1. As the sender is in the Context public interface, the application user can send messages to the channels without waiting for it to have space. It will cause RecvError::Lagged which will kill the scheduler.
  2. The busy waiting solution I posted above can produce race conditions, a semaphore is required to coordinate the sends to the channel.
  3. It many not keep the order of the jobs.

If you want to implement this solution let me know, I can help to implement it.

4. Shutting down the scheduler in case it exists any loop Once the scheduler exits any of the actors threads, it will not behave correctly anymore, as the different actors will not be able to communicate correctly. In those cases, I think that the best approach is to fully stop the scheduler, and return an error code to the thread that called scheduler.start(). This way, the application developer can decide whether to restart the scheduler or handle this situation in any other way.

I think that the 4 point, Shutting down the scheduler in case it exists any loop, is mandatory to implement, to let the developer deceide what to do in case of error.

Clarification The main reason I opened this thread is to discuss what are the best steps to solve this problem. I think that the first thing to do is to be aligned in what would be the best solution and then implement it. I can help coding the solution once we have decided what steps to do.

At the moment, I implemented just the solution 2 in my fork of the project and it works well for my use case, but probably it is not the best solution for all the use case because I break the public interface.

DenuxPlays commented 3 months ago

Can't we combine/implement both? I think the "right"/"best" (or whatever you wanna call it) approach is by replacing broadcast with mpsc. But I would love to configure the channel size.

marioloko commented 3 months ago

Yes, we can combine both without problem

marioloko commented 3 months ago

@mvniekerk what do yo think about the solution?

mvniekerk commented 3 months ago

Hey all, I love these comments. Nothing like real-world use cases breaking code and working around it.

What I'm gathering from this discussion that we need a way to i) handle failures more elegantly, ii) allow some sort of back-pressure handling where the jobs are not scheduled at exactly the correct time but retried a bit later and spew out errors accordingly iii) Allow channel capacity to be changed at runtime (maybe dynamic?)

I'll think around it. I appreciate your comments.

marioloko commented 3 months ago

Regarding iii, it can be statically defined, but at least configurable, right now it is always set to 200.

Making it grow dynamically will make the solution much harder, as you are keeping an Arc<Sender> and Arc<Receiver> which are inmutable, making them mutable will require to create a new channel and propagate to all threads (or if you use a Mutex for synchronization it will slow down the solution).

If it is possible to propagate the Lagged error outside the scheduler, it will be possible for the final user to notice the error and take actions, as for example recreate the scheduler with new capacity.

mvniekerk commented 2 months ago

Still thinking :-)