marcoCasamento / Hangfire.Redis.StackExchange

HangFire Redis storage based on original (and now unsupported) Hangfire.Redis but using lovely StackExchange.Redis client
Other
468 stars 109 forks source link

Jobs in correct processing state are requeued after InvisibilityTimeout, which leads to stuck jobs after ungraceful shutdown #135

Closed filippetrovic closed 5 months ago

filippetrovic commented 10 months ago

Hello,

First of all, thanks for developing and maintaining this project.

We've been long users of Hangfire with PostgreSQL. Recently, we've started evaluating the switch to Redis, and naturally, this lib was our first choice. After testing it for a while, I've found an edge case that can cause jobs to be lost by being stuck in the Processing state. Even though it's not common, it has a high impact.

How to reproduce it:

  1. Set InvisibilityTimeout to TimeSpan.FromSeconds(1)
  2. Enqueue job _backgroundJobs.Enqueue(() => _testService.LongTask(TimeSpan.FromSeconds(600), CancellationToken.None));
  3. The job is started after a while, and we have Task started in the logs
  4. FetchedJobsWatcher kicks in after a while with (Hangfire.Redis.StackExchange.FetchedJobsWatcher) 1 timed out jobs were found in the 'default' queue and re-queued.
  5. The job is started once again. Task started is shown once again in the logs
  6. Kill the Hangfire server ungracefully with kill -9 PID
  7. At this point, the job is stuck in the Processing state. It won't be requeued even after waiting for InvisibilityTimeout.

Additionally, I've noticed that at some point between points 5 and 6, Redis lists for a queue are absent, i.e., keys *default* returns nothing. That's why the job won't be picked up after starting a new server, even after the killed server is removed from the list of active servers.

Here is the job I've been using for testing:

    public async Task LongTask(TimeSpan duration, CancellationToken cancellationToken)
    {
        _logger.LogInformation("Task started");
        while (duration >= TimeSpan.Zero)
        {
            cancellationToken.ThrowIfCancellationRequested();

            var waitInterval = TimeSpan.FromSeconds(10);

            _logger.LogInformation("Waiting...");
            await Task.Delay(waitInterval, CancellationToken.None);
            duration -= duration;
            _logger.LogInformation("Wait finished");
        }
        _logger.LogInformation("Task finished");
    }

My questions are: Is there a way to mitigate this? Should FetchedJobsWatcher requeue the job when the job is in the Processing state with a live server executing it? Is there some mechanism for monitoring Processing jobs for being stuck if it's okay for jobs to be removed from the queue?

Let me know if I can help by providing more testing and information.

filippetrovic commented 10 months ago

I'm aware that InvisibilityTimeout can be set to a time period greater than the job execution time, could prevent this. But would it lead to other unwanted behaviors, like introducing unwanted delays in some cases?

marcoCasamento commented 10 months ago

Hi, couple of questions:

The code in FetchedJobsWatcher should take care of this situation, marking the job as "Checked" and requeuing it at the next run of FetchedJobsWatcher.Execute by disposing a RedisFetchedJob. Of course is totally possible that something goes unexpected but I'd need you to switch to the beta and give me a repro (and some time) to try debug the problem. I know that the actual versions isn't deprecated but I have very limited time to devolve to the project and I prefer to debug the very latest version.

Aside from that, an InvisibilityTimeout of 1" is something pretty weird, I guess you set it to that value just to ease the reproduction of the error. A good value for this settings take into consideration the duration of job that take most time. You definitely want to pick a value that is sensibly greater that that.

filippetrovic commented 10 months ago

at point 3 you wite: "The job is started after a while, and we [...]" that make me think there's something wrong on redis side, maybe the pub/sub isn't working ? it should start immediately

It starts practically immediately, under a second I would say.

does the job stuck in processing shows a little triangle in the dashboard ? does re-enquieing it make the job start ?

Yes. Yes.

The code in FetchedJobsWatcher should take care of this situation, marking the job as "Checked" and requeuing it at the next run of FetchedJobsWatcher.Execute by disposing a RedisFetchedJob.

FetchedJobsWatcher is looking only at the queue. But for me, at some point between steps 5 and 6, the Redis list for the queue becomes empty. https://github.com/marcoCasamento/Hangfire.Redis.StackExchange/blob/79e7bfd2ce2613fe8978b849d9f27885f897c47e/Hangfire.Redis.StackExchange/FetchedJobsWatcher.cs#L81C29-L81C29

Of course is totally possible that something goes unexpected but I'd need you to switch to the beta and give me a repro (and some time) to try debug the problem. I know that the actual versions isn't deprecated but I have very limited time to devolve to the project and I prefer to debug the very latest version.

Sure, let me know how to switch to the beta.

Aside from that, an InvisibilityTimeout of 1" is something pretty weird, I guess you set it to that value just to ease the reproduction of the error. A good value for this settings take into consideration the duration of job that take most time. You definitely want to pick a value that is sensibly greater that that.

Yes, that value is there just to emphasize the behavior and save me from waiting while testing :)

FixRM commented 7 months ago

Hello @marcoCasamento. We hit similar issue as reported here. If job doesn't support CancelationToken and if it was killed by Hangfire itseft after InvisibilityTimeout, that causes a loss of a worker. Here we have a talk with Sergey (a creator of Hangfire). He said that this issue must be related to the storage implementation.

Please let me know if I can help somehow.

marcoCasamento commented 7 months ago

Hello and thanks for the repro. That's interesting indeed. I hope to run the repro this week and post some feedback here

Lexy2 commented 6 months ago

I think I now have a better understanding of what is happening.

at point 3 you wite: "The job is started after a while, and we [...]" that make me think there's something wrong on redis side, maybe the pub/sub isn't working ? it should start immediately

FetchedJobsWatcher successfully requeues the expired job but does not publish the requeued job id to the subscription channel. Thus, FetchNextJob is only triggered when FetchTimeout expires, which is in the next 3 minutes.

This one is quite easy to fix, I'll do it.

If job doesn't support CancelationToken and if it was killed by Hangfire itseft after InvisibilityTimeout, that causes a loss of a worker

It doesn't case a loss of a worker. The worker keeps executing that long job. Once the worker finishes executing that long job, it will be available again. Of course, if the job is hung, the worker will be lost, but it's not the fault of the storage in this case.


What happens is that when the job is requeued, it somehow gets removed from the fetched jobs after after it's requeued, and this is why further requeues or restarts don't see this job. I'll look closer.

Lexy2 commented 6 months ago

Oh my God. ServerJobCancellationWatcher is a Hangfire's background process to remove aborted jobs from workers. It frees up the original worker when the job is requeued and at the same time removes the reallocated by FetchedJobsWatcher job from fetched jobs.

It deems the job aborted if all conditions are true:

Not sure how to mitigate at this stage. One way, which kind of stupid, is to wait for 5 seconds before requeuing the job so that the cancellation watcher wouldn't terminate an already rerunning job.

Lexy2 commented 6 months ago

So, here's what happens.

  1. FetchedJobsWatcher finds that the job exceeded its invisibility timeout.
  2. It removes the job from Fetched jobs and requeues as the next job into the corresponding queue.
  3. Another worker picks up the job and puts it into Fetched jobs.
  4. ServerJobCancellationWatcher kicks in, sees that the previous job was aborted, and cancels the cancellation token causing the previous worker to stop its execution.
  5. The previous worker finishes execution and removes the job from the Fetched jobs.

At this stage we have no information that this job is fetched and thus being executed. FetchedJobsWatcher will not requeue the job anymore. If the server dies, the job in the Processing state will be aborted forever.

We'd need either to requeue the job after the ServerJobCancellationWatcher kicks in, or find a mechanism to distinguish between the same job reprocessing and this job being cancelled.

marcoCasamento commented 6 months ago

Thanks for the detailed analysis and explanation.

I've in fact encountered that behaviour in a few edge cases, the job appears to stay in the Processing state forever. I ends up with a solution that is bad enough to don't deserve sharing, I simply check the Processing queue and requeue every job found that are is "effectively" in processing state. With "effectively" I mean that I put up a sort of tracking in a static concurrent dictionary about which job every thread in each server is executing at any time.

I didn't check deep enough to understand that the problem involved ServerJobCancellationWatcherand I quickly put together this workaround, but I found it overly complex and can also impact fetching time I believe it somehow fall in the first solution you propose, but what about the second one ? did you fine any clues about how to distingush between the two cases ?

Lexy2 commented 6 months ago

did you fine any clues about how to distingush between the two cases

odinserj's SQL storage stores FetchedAt time with the IFetchedJob instance and terminates the job only if this value matches. I'll think more what way we could leverage.

Lexy2 commented 6 months ago

I have an idea. What if we store the explicit Fetched state in a property of a newly created RedisFetchedJob instance, and when the worker calls RemoveFromQueue, we compare that value with the job's Fetched hash value and don't call RemoveFromFetchedList if that value doesn't match?

That's similar to SqlServerTimeoutJob but should do the trick.

Lexy2 commented 6 months ago

I managed to make FetchedJobsWatcher work as designed, where it cancels and requeues any job that is executing beyond InvisibilityTimeout, and it does it consistently in the same session and after restarts, with transactions and without them. I still need to do some code cleanup before I can show it. Which was the original Hangfire behaviour before 1.5.0.

Now I understand that @filippetrovic wants a slightly different behaviour: so that jobs that are executing on a running server are not terminated even if the InvisibilityTimeout passed. It is possible to implement it by checking if the server executing the current job is registered and within heartbeat interval.

However, this contradicts the second requirement: monitor whether the jobs are stuck in the current execution session. InvisibilityTimeout is exactly this mechanism. If this job is executing longer than this value, it is considered stuck. There is no other mechanism.

My question to @marcoCasamento and you guys as the consumers of the library: what are we going to do with this?

I think we should

  1. Keep the existing behaviour of the InvisibilityTimeout parameter - requeue the jobs that execute longer than that, don't touch even stuck jobs if they are within the interval.
  2. Implement a mechanism of quick automatic requeuing of jobs that are deemed to be aborted (belong to a non-existent or exceeded Heartbeat server)
Lexy2 commented 6 months ago

Answering your question, @FixRM. If a job does not support cooperative cancellation with a CancellationToken, you can't instruct it to terminate. It'll be holding the worker until the server restarts.

It will requeue after InvisibilityTimeout, but it only means that both workers will be executing the same job, and if they both get stuck at a specific stage, you'll lose another worker.

marcoCasamento commented 6 months ago

I have an idea. What if we store the explicit Fetched state in a property of a newly created RedisFetchedJob instance, and when the worker calls RemoveFromQueue, we compare that value with the job's Fetched hash value and don't call RemoveFromFetchedList if that value doesn't match?

That's similar to SqlServerTimeoutJob but should do the trick.

Way better than my hack.

marcoCasamento commented 6 months ago

My question to @marcoCasamento and you guys as the consumers of the library: what are we going to do with this?

I think that's actually no way to avoid the InvisibilityTimeout, so I totally agree with your point no. 1.

I'm not sure however about the no. 2. On the servers I manage, I see that if a server restart happens, within a few minutes the jobs that the server were processing before the restart, no more appears with a warning triangle but they are quickly requeued. Does your jobs still appear in processing within a dead jobserver ?

Lexy2 commented 6 months ago

I'm not sure however about the no. 2. On the servers I manage, I see that if a server restart happens, within a few minutes the jobs that the server were processing before the restart, no more appears with a warning triangle but they are quickly requeued. Does your jobs still appear in processing within a dead jobserver ?

Yes, they do. And not requeued until the original InvisibilityTimeout expires. FetchedJobsWatcher does not see them. image image Here the InvisibilityTimeout is set to 7 minutes, and the server is terminated after ~20 seconds of running the job.

marcoCasamento commented 6 months ago

Ok i've then setup an IBackgroundProcess that do that job and simply forgot about it. I'll have a look at it

Il dom 28 apr 2024, 11:24 Lexy @.***> ha scritto:

I'm not sure however about the no. 2. On the servers I manage, I see that if a server restart happens, within a few minutes the jobs that the server were processing before the restart, no more appears with a warning triangle but they are quickly requeued. Does your jobs still appear in processing within a dead jobserver ?

Yes, they are. And not requeued until the original InvisibilityTimeout expires. FetchedJobsWatcher does not see them. image.png (view on web) https://github.com/marcoCasamento/Hangfire.Redis.StackExchange/assets/38648992/a17f7fb4-7364-4f53-a51e-9c23c2c357ed

— Reply to this email directly, view it on GitHub https://github.com/marcoCasamento/Hangfire.Redis.StackExchange/issues/135#issuecomment-2081407281, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABW2Y7WH3H434ZZCVXBBLLY7S53XAVCNFSM6AAAAABB6GOC42VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDAOBRGQYDOMRYGE . You are receiving this because you were mentioned.Message ID: @.*** com>

filippetrovic commented 6 months ago

Hey, thanks for the thorough investigation!

I think we should

  1. Keep the existing behaviour of the InvisibilityTimeout parameter - requeue the jobs that execute longer than that, don't touch even stuck jobs if they are within the interval.
  2. Implement a mechanism of quick automatic requeuing of jobs that are deemed to be aborted (belong to a non-existent or exceeded Heartbeat server)

For my use case, number 2 interests me. My system's architecture is based on a large number of short jobs (usually under a minute, with the longest one being 5 minutes). Hangfire servers are part of a K8s cluster that we (aggressively) scale based on the number of jobs in the queue. We are also planning to migrate hangfire servers to spot instances. Shortly put, we have and expect to keep having a lot of server shutdowns and restarts :)

That being said, my biggest problem would be losing jobs (in this case, being falsely shown as Processing). I prefer at-least-once execution :)

Please let me know if I can help somehow

marcoCasamento commented 6 months ago

I've found this on my codebase:

public class RequeueOrphanedJobsBackgroundProcess : IBackgroundProcess
{
    readonly TimeSpan interval = new TimeSpan(0, 1, 0);
    public void Execute([NotNull] BackgroundProcessContext context)
    {
        try
        {
            var storageApi = context.Storage.GetMonitoringApi();
            var processingJobs = storageApi.ProcessingJobs(0, 500);
            var runningServers = storageApi.Servers();
            var orphanJobs = processingJobs.Where(x => !runningServers.Any(y => y.Name == x.Value.ServerId)).ToList();
            foreach (var orphanJob in orphanJobs)
            {
                BackgroundJob.Requeue(orphanJob.Key);
            }
        }
        catch (Exception)
        {
        }
        context.ShutdownToken.WaitOrThrow(interval);

    }

}

@filippetrovic you can of course add it to the jobserver instance by simply registering it as a singleton in the DI container used i.e. services.AddSingleton<IBackgroundProcess, RequeueOrphanedJobsBackgroundProcess>(), or by passing it to the AddHangfireServer config extension, i.e. _ = services.AddHangfireServer(storage: JobStorage.Current , additionalProcesses: new[] { new RequeueOrphanedJobsBackgroundProcess ()} );

@Lexy2 as per your point n.2

2. Implement a mechanism of quick automatic requeuing of jobs that are deemed to be aborted (belong to a non-existent or exceeded Heartbeat server)

do you think it could serve it ? Also, I think such a behaviour should be an opt-in for end-user

Lexy2 commented 6 months ago

do you think it could serve it ?

Yes, sure. I'd add one thing - requeuing of jobs that belong to reportedly active servers that haven't had a heartbeat recently. And maybe include in the FetchedJobsWatcher to reduce the number of background processes. But I'm not strongly for or against any decision.

Also, I think such a behaviour should be an opt-in for end-user

I think this should be the default behaviour. The whole point of Hangfire is reliable job execution, so people expect the jobs to be executed even after a server restart (they will be after invisibility timeout anyway, it just happens sooner if server is terminated).

Lexy2 commented 6 months ago

That being said, my biggest problem would be losing jobs (in this case, being falsely shown as Processing). I prefer at-least-once execution :)

We'll fix that. I just want to deliver requeuing for aborted servers together the way we agree it should work.

marcoCasamento commented 6 months ago

I'd add one thing - requeuing of jobs that belong to reportedly active servers that haven't had a heartbeat recently

Agree. The hangfire dashboard server page seems to use a 5 min timeout.

And maybe include in the FetchedJobsWatcher to reduce the number of background processes

Agree, if we're going to include that in the storage, I believe that the FetchedJobsWatcher can handle it. At the end of the day the component 's duty is to re-queue timed out job, so I think it's semantically acceptable to add the re-queue of jobs in server restart case.

The whole point of Hangfire is reliable job execution, so people expect the jobs to be executed even after a server restart

Fair point. Admittedly, I do the same on the servers I manage, though being forgotful about it. It's however a change in the behavior. People can potentially stop a server in a farm (thinking of multiple server/process even for scenario of different queue processing) to stop the execution of some particular jobs just to have it quickly requeued on another instance. I agree that this is a scenario that would only affects a few. To be honest I'm not event sure this is a storage duty but is nonetheless useful. Anyone knows how other storage implementation manage this situation ?

Lexy2 commented 6 months ago

People can potentially stop a server in a farm (thinking of multiple server/process even for scenario of different queue processing) to stop the execution of some particular jobs just to have it quickly requeued on another instance.

Well, that's an original way to terminate a job - shut down the whole server with multiple workers to prevent a specific job from proceeding.

I know that SQL storage automatically requeues aborted jobs, but does it by locking a specific row while the job is executing so it's invisible to other workers/servers. If the server or worker dies, the row is automatically unlocked by virtue of closing the DB connection, and the job will be immediately available for "performance", as odinserj calls it, on any available worker or server.

Don't know about other storage implementations.

Lexy2 commented 6 months ago

We probably can't requeue the jobs being executed on servers that have not yet been removed by timeout. We can't use 1 minute timeout, or 5 minute timeout. The reason is that server timeout and server heartbeat are configurable values in Hangfire, and you can have a server that times out in 24 hours, and sends a heartbeat every hour, and you can have another server that sends heartbeat every 10 minutes, and times out in 20 minutes, etc.

marcoCasamento commented 6 months ago

Well, that's an original way to terminate a job - shut down the whole server with multiple workers to prevent a specific job from proceeding.

Indeed. Giving SqlServerStorage implemtation details you shared, I believe we can set the new behavior as the default with an opt-out settings, just to give end-users the flexibility to keep the current behavior. I believe that will imply to release a new minor version.

We can't use 1 minute timeout, or 5 minute timeout. The reason is that server timeout and server heartbeat are configurable values in Hangfire

I'm not sure about this. Assuming we'll use the code posted above, Storage.GetMonitoringApi().Servers() only returns server that are considered active, thus taking into consideration server heartbeat and server timeout. Or are you maybe referring to something else than the code above ?

marcoCasamento commented 6 months ago

Sorry didn't pay attention to the PR I'll review it

Lexy2 commented 6 months ago

Assuming we'll use the code posted above, Storage.GetMonitoringApi().Servers() only returns server that are considered active, thus taking into consideration server heartbeat and server timeout. Or are you maybe referring to something else than the code above ?

Storage.GetMonitoringApi().Servers() returns all registered servers, whether they are active or inactive, working or stuck.

Each server calls the Heartbeat storage method periodically, which updates server's data in the storage. This period is configured when you declare a server, i.e.

var builder = WebApplication.CreateBuilder(args)
    .Services
    .AddHangfireServer(options =>
    {
        options.HeartbeatInterval = TimeSpan.FromHours(3);
        options.ServerTimeout = TimeSpan.FromHours(12);
    });

This means that server will report to storage only every 3 hours since it starts. The second option ServerTimeout means that ServerWatchdog - process that runs on any active server, will periodically ask storage to remove servers that have not reported within ServerTimeout interval - see ServerWatchdog.cs and RedisConnection.cs

Theoretically, you can configure timeout to be less than heartbeat interval, and servers will get removed before they have a chance to report :-)

Theoretically, you can configure different heartbeats and timeouts on different servers, and each ServerWatchdog will ask storage to remove servers based on different assumptions :-)

So, you don't know whether the server that monitoring API returns is alive or not until it sends its next heartbeat. You also don't know within the storage what heartbeat interval and timeout is configured by the hosting application. Assuming they are default 1 minute - 5 minutes is a path to troubles.

Thus, we can only rely on ServerWatchdog to have removed the server from the list of active servers already. If the server is still there, we can't just read its last heartbeat and make assumptions whether it's alive or not.

You are right that we can use monitoring API to query active servers. I'm only saying that this is our best effort to find out what servers are active.

Lexy2 commented 6 months ago

I believe we can set the new behavior as the default with an opt-out settings, just to give end-users the flexibility to keep the current behavior.

This is configurable using the server timeouts. However, if you think that's practical, we can introduce an option to not requeue the jobs assigned to dead servers, and only take into account the InvisibilityTimeout. I personally don't think we should.

FixRM commented 5 months ago

Sorry guys, I missed this conversation. We only/mostly use Hangfire to run recurring jobs so we are more or less ok if one particular instance will be lost. More than that, it's better than if stucked instance will be accidentally required in parallel with newer one. That's why we disabled automatic retries globally.

Answering you question, I prefer if job will be killed after timeout without holding a worker but OOB automatic retries behavior should be honored if possible. Personally, I consider any timeout as job failure. But if job accepts CancelationToken and it handles it correctly at the given amount of time - that was successful completion for my use case (of recurring job that can continue later).

We are refactoring our jobs to support cancellation but it's not that easy for legacy codebase. And yes, timeouts happen there.

Lexy2 commented 5 months ago

More than that, it's better than if stucked instance will be accidentally required in parallel with newer one.

I think you meant "requeued". Yet, we disable parallel running for not-so-reentrant jobs with [DisableConcurrentExecution(timeoutInSeconds: 1)] attribute.

FixRM commented 5 months ago

More than that, it's better than if stucked instance will be accidentally required in parallel with newer one.

I think you meant "requeued". Yet, we disable parallel running for not-so-reentrant jobs with [DisableConcurrentExecution(timeoutInSeconds: 1)] attribute.

Yep, it was a typo. Thanks for the advice. The problem here is that DisableConcurrentExecution only prevents "execution", not "queuering". So let's say that due to the bug all workers got stuck until server restart and no jobs are performed at all. Meanwhile timer is ticking and new instances of recurring jobs are created and queue grows. May be you know how to prevent that?

Lexy2 commented 5 months ago

May be you know how to prevent that?

There are many indirect ways to prevent that. We added a filter that moves the jobs to Deleted queue if more than 2 minutes passed between queueing and execution attempt. With this approach, if there are no free workers (or the background server is not running), the queued up jobs will not run, but the next scheduled job will.

public class MisfireHandlingAttribute : JobFilterAttribute, IElectStateFilter
{
    public void OnStateElection(ElectStateContext context)
    {
        if (context.CurrentState == "Enqueued"
            && context.CandidateState is ProcessingState
            && context.BackgroundJob.CreatedAt < DateTime.UtcNow - TimeSpan.FromMinutes(2))
        {
            context.CandidateState = new DeletedState();
        }
    }
}
FixRM commented 5 months ago

Thanks! I guess we should add a check if job was instance of recurring job but it is still a fresh look at the problem. At the moment we are testing this approach: https://gist.github.com/odinserj/a8332a3f486773baa009?permalink_comment_id=4048344#gistcomment-4048344