perrich / Hangfire.MemoryStorage

A memory storage for Hangfire.
Apache License 2.0
131 stars 44 forks source link

System.Threading.SynchronizationLockException #31

Open Peter-Optiway opened 5 years ago

Peter-Optiway commented 5 years ago

Hi i get the following error when using the MemoryStorage:

2019-09-03 12:02:23 [WARN]  (Hangfire.Server.Worker) Slow log: Hangfire.DisableConcurrentExecutionAttribute performed "OnPerforming for e1e86681-70dc-484e-b7ba-015188696f79" in 60 sec
2019-09-03 12:02:23 [ERROR] (Hangfire.AutomaticRetryAttribute) Failed to process the job 'e1e86681-70dc-484e-b7ba-015188696f79': an exception occurred.
System.Threading.SynchronizationLockException
Object synchronization method was called from an unsynchronized block of code.
   at Hangfire.MemoryStorage.Utilities.LocalLock..ctor(String resource, TimeSpan timeout)
   at Hangfire.MemoryStorage.MemoryStorageConnection.AcquireDistributedLock(String resource, TimeSpan timeout)
   at Hangfire.DisableConcurrentExecutionAttribute.OnPerforming(PerformingContext filterContext)
   at Hangfire.Profiling.ProfilerExtensions.InvokeAction[TInstance](InstanceAction`1 tuple)
   at Hangfire.Profiling.SlowLogProfiler.InvokeMeasured[TInstance,TResult](TInstance instance, Func`2 action, String message)
   at Hangfire.Profiling.ProfilerExtensions.InvokeMeasured[TInstance](IProfiler profiler, TInstance instance, Action`1 action, String message)
   at Hangfire.Server.BackgroundJobPerformer.InvokePerformFilter(IServerFilter filter, PerformingContext preContext, Func`1 continuation)

The job that is causing this error has the following attributes [AutomaticRetry(Attempts = 0)][DisableConcurrentExecution(timeoutInSeconds: 60)]

What could be causing this issue?

perrich commented 5 years ago

I think that the lock is done and the next call, the exception is thrown (see LocalLock source code) ;

        private LocalLock(string resource, TimeSpan timeout)
        {
            _lock = Locks.GetOrAdd(resource, new object());
            _resource = resource;

            bool hasEntered = Monitor.TryEnter(_lock, timeout);
            if (!hasEntered)
            {
                throw new SynchronizationLockException();
            }
        }
ghabre commented 5 years ago

facing the same issue. things worked fine for almost a month, then all of a sudden this error in my logs and the process no longer executing as scheduled. Also using MemoryStorage and the attributes: [DisableConcurrentExecution(0)] [AutomaticRetry(Attempts = 0, OnAttemptsExceeded = AttemptsExceededAction.Delete)]

avlcodemonkey commented 4 years ago

Has there been a resolution found for this issue? I'm seeing the same error as @ghabre, using the same attributes, but I am a bit behind on Nuget updates (1.6.0 instead of 1.6.3).

ghabre commented 4 years ago

hi @avlcodemonkey, the way i solved it was a bit hacky and manual.. whevener i start execution of the scheduled event, i also started in parallel a task.Delay for 2 hours (figured my task should never exceed that) and then whenever any of the two tasks completed i cancelled and disposed the other one. below some code snippet hope it helps:

TaskExecutionResult results;
using (var taskCancellation = new CancellationTokenSource())
{
    var task = Task.Run(async () => await this.ExecuteTask(...), taskCancellation.Token);
    using (var timeoutCancellation = new CancellationTokenSource())
    {
        var timeout = Task.Delay(TimeSpan.FromHours(2), timeoutCancellation.Token);

        if (await Task.WhenAny(task, timeout) == task)
        {
            timeoutCancellation.Cancel();
        }
        else
        {
            taskCancellation.Cancel();
            task.Dispose();
            throw new TimeoutException();
        }
    }

    // Task completed within timeout.
    // Consider that the task may have faulted or been canceled.
    // We re-await the task so that any exceptions/cancellation is rethrown.
    results = await task;
}
avlcodemonkey commented 4 years ago

Thanks for the info @ghabre. I was considering an approach using ManualResetEvent but it seemed like overkill.

It originally appeared to me that there was a lock issue preventing a job from being performed, even when there were no other instances running. I set up a test with job X set to run every minute with the DisableConcurrentExecution attribute. Then inside of X I added
if (Counter < 5) Thread.Sleep(210000); Counter++; To make sure that X would still be running and trigger the concurrency issue the first 5 successful runs. My goal was to make that sure after 5 runs, jobs would start processing normally every minute. And this appeared to be the case. I still saw the SynchronizationLockException errors in my deleted jobs, but that also seems to be the correct behavior.

I think the original issue I saw may not have been the caused by DisableConcurrentExecution. Instead, I think the problem I saw might have been caused by WorkerCount being too low and Hangfire getting backed up.

worldofsites commented 4 years ago

Same issue. Using MemoryStorage and the attributes:

[AutomaticRetry(Attempts = 0, DelaysInSeconds = new[] {0}, OnAttemptsExceeded = AttemptsExceededAction.Delete)]
[DisableConcurrentExecution(0)]

How to solve?

worldofsites commented 4 years ago

As possible fix added one more recurring job to check the status of other recurring jobs + with the ability to call this method through the api.

[AutomaticRetry(Attempts = 0, DelaysInSeconds = new[] {0}, OnAttemptsExceeded = AttemptsExceededAction.Delete)]
[DisableConcurrentExecution(0)]
public void CheckStateOfJobs()
{
    const int selection = 1000;
    var jobActivityComparisonTime = DateTime.UtcNow.AddMinutes(-30);
    var monitoringApi = JobStorage.Current.GetMonitoringApi();
    var succeededJobs = monitoringApi.SucceededJobs(0, selection);
    var processingJobs = monitoringApi.ProcessingJobs(0, selection);
    var deletedJobs = monitoringApi.DeletedJobs(0, selection);
    if (succeededJobs.Count >= 0)
    {
        string[] jobNames = {nameof(Job1), nameof(Job2), nameof(Job3)};
        foreach (var jobName in jobNames)
        {
            var succeededCount = 0;
            foreach (var succeededJob in succeededJobs)
            {
                if (succeededJob.Value.SucceededAt < jobActivityComparisonTime)
                {
                    break;
                }
                if (succeededJob.Value.Job.ToString().Contains(jobName))
                {
                    succeededCount++;
                    break;
                }
            }
            var processingCount = 0;
            foreach (var processingJob in processingJobs)
            {
                if (processingJob.Value.StartedAt < jobActivityComparisonTime)
                {
                    break;
                }
                if (processingJob.Value.Job.ToString().Contains(jobName))
                {
                    processingCount++;
                    break;
                }
            }
            if (succeededCount == 0 && processingCount == 0)
            {
                _logger.Info(
                    $"Job with name {jobName} should restarted because something wrong with synchronization locks.");
                var deletedCount = 0;
                foreach (var deletedJob in deletedJobs)
                {
                    if (deletedJob.Value.DeletedAt < jobActivityComparisonTime)
                    {
                        break;
                    }
                    if (deletedJob.Value.Job.ToString().Contains(jobName))
                    {
                        deletedCount++;
                        break;
                    }
                }
                if (deletedCount != 0)
                {
                    using var connection = JobStorage.Current.GetConnection();
                    foreach (var recurringJob in connection.GetRecurringJobs())
                    {
                        if (recurringJob.Id.Contains(jobName))
                        {
                            RecurringJob.RemoveIfExists(recurringJob.Id);
                            _jobRunner.RunJobByName(jobName);
                        }
                    }
                }
            }
        }
    }
}

And to simulate the occurrence of a situation in which the job is not executed added the following code to one of the jobs:

List<RecurringJobDto> list;
using (var connection = JobStorage.Current.GetConnection())
{
    list = connection.GetRecurringJobs();
}
var job = list?.FirstOrDefault(j =>
    j.Id.Contains(nameof(UpdateStatuses)));
if (job != null && !string.IsNullOrEmpty(job.LastJobId))
{
    BackgroundJob.Delete(job.LastJobId);
    return;
}
Ruud-cb commented 3 years ago

Hi 2019, 2021 here just checking if there is an update on this for an actual fix.

Spacefish commented 2 years ago

Running into the same issue :( i believe this only happens if you define your worker method "async". Because of this, the lock might be released by another thread that it was taken by, this confuses .NET and leads to the exception. The relevant lock is here: https://github.com/perrich/Hangfire.MemoryStorage/blob/master/src/Hangfire.MemoryStorage/Utilities/LocalLock.cs

Monitor.TryEnter(); throws the exception, after "Dispose()" was called on another thread for the same lock. Probably Monitor.Exit() is called on the "foreign" thread, throws an exception and the lock objects is never removed from the "Locks" ConcurrentDictionary, however the exception is probably caught. As i said, this only happens if you use "async" worker functions at random, as the thread callid Monitor.Exit() may be a different one that entered it before (which is not supported by Monitor).

We should either use a SemaphoreSlim instead of a "lock" object + monitor or we should just remove the lock object form the Locks Dictionary if Monitor.Exit() throws an exception. I prefer the first solution as the second can generate race conditions.

The more i think about this piece of code the more i see problem b.t.w. just imagine the following case: Thread 1 acquires the lock, it´s added to the Dictionary "Locks" Thread 2 tries to acquire the lock, get´s it from the Dictionary and wait on Monitor.Enter() Thread 1 finishes and calls "Dispose" which calls Monitor.Exit() Thread 2 get´s notify and Monitor.Enter() suceeds Thread 1 continues to run and removes the Lock from the List Locks.TryRemove() in Dispose().. Thread 2 is now running and uses the resource Thread 3 is launched and calls AcquireLock() on the same resource, a new lock is created and added to the list of Locks + Monitor.Enter() succeds, as Thread 1 deleted it from the list during Dispose() Thread 3 and Thread 2 are now running in parallel in the same resource (which defeats the purpose of the whole thing)

In summary: The lock object should not be removed from the Locks Dictionary if someone is still using it, this process itself (checking / acquiring) needs to be synchronized as well.

I will do a merge request this weekend, when i have free time to do it. Using the memory storage as well, as i just use Hangfire as an .NET based Cron in my application :D

Another Option would be to juse use Hangfire.InMemory instead of Hangfire.MemoryStorage which is the memory storage implementation from the hangfire team

perrich commented 2 years ago

Thank you about the explanation of the issue and your MR proposal

russkundev commented 2 years ago

Any update on this one, please? :) Also how can we force this exception to happen?

mentapro commented 2 years ago

@perrich @Spacefish Do you have some updates on that? It is a very critial issue.

perrich commented 2 years ago

As you can see, no PR was provided but some workarounds have been explained. Please don't hesitate to help.

Spacefish commented 2 years ago

@mentapro no i use the NuGet Package Hangfire.MemoryStorage, which is the semi "official" MemoryStorage implementation from the hangfire devs, which does not have this problem.

MestreDosMagros commented 1 year ago

Any updates on this?

Spacefish commented 1 year ago

Any updates on this?

Ok damn it, i will fix it, even if i don´t use the library any more. Getting notifications for that thread since over a year now.

Spacefish commented 1 year ago

I did a PR here: https://github.com/perrich/Hangfire.MemoryStorage/pull/41

Spacefish commented 1 year ago

I added a UnitTest for this as well in #42