HangfireIO / Hangfire.InMemory

In-memory job storage for Hangfire with an efficient implementation
Other
70 stars 15 forks source link

Recurring Jobs doesn't works #14

Closed knopa closed 3 days ago

knopa commented 2 weeks ago

Seems Recurring Jobs doesn't work well image

odinserj commented 2 weeks ago

Hi @knopa. Unfortunately can't reproduce this, any special actions required, like changing time zone or system time?

knopa commented 2 weeks ago

.NET 8, Linux, UTC Timezone

I have some custom things which could affect

Producer Setup

public static void SetupHangfireProducer(this IServiceCollection services, string connectionString, IWebHostEnvironment environment)
{
    services.AddHangfire((serviceProvider, hangfire) =>
    {
        var jobsSettingsConfig = serviceProvider.GetRequiredService<IOptions<JobsSettings>>();
        hangfire
            .UseDefaultCulture(CultureInfo.GetCultureInfo("en-US"))
            .SetDataCompatibilityLevel(CompatibilityLevel.Version_180)
            .UseInMemoryStorage(new InMemoryStorageOptions())
            .UseFilter(new AutomaticRetryAttribute
            {
                //to bypass 429 RU limit in cosmos db
                Attempts = 10,
                DelayInSecondsByAttemptFunc = attempt => (int)attempt * 60,
                LogEvents = true
            });

        HangfireJobsScheduler.RegisterAndRunJobs(jobsSettingsConfig.Value);
    });

    services.TryAddSingleton<IBackgroundJobFactory>(serviceProvider =>
        new ApplicationInsightsBackgroundJobFactory(new BackgroundJobFactory(serviceProvider.GetRequiredService<IJobFilterProvider>()))
    );

    services.TryAddSingleton<IBackgroundJobStateChanger>(serviceProvider =>
        new ApplicationInsightsBackgroundJobStateChanger(new BackgroundJobStateChanger(serviceProvider.GetRequiredService<IJobFilterProvider>()))
    );

    services.TryAddSingleton<IBackgroundJobPerformer>(serviceProvider =>
        new ApplicationInsightsBackgroundJobPerformer(
            new BackgroundJobPerformer(
                serviceProvider.GetRequiredService<IJobFilterProvider>(),
                serviceProvider.GetRequiredService<JobActivator>(),
                TaskScheduler.Default
            ),
            serviceProvider.GetRequiredService<TelemetryClient>()
        )
    );
}

Consumer Setup

public static void SetupHangfireConsumer(this IServiceCollection services, HangfireSettings config, IWebHostEnvironment environment)
{
    JobFilterProviders.Providers.Add(new HangfireJobFilterProvider());

    var hangfireOptions = Action<BackgroundJobServerOptions> (string[] queues, string name, int workerCount) =>
    {
        var envName = environment.IsProduction()
                        ? string.Empty
                        : $"{environment.EnvironmentName}-";
        return x =>
        {
            x.Queues = queues;
            x.WorkerCount = workerCount;
            x.ServerName = $"{envName}{name}-{AppVersionService.AppVersion}";
        };
    };

    services.AddHangfireServer(hangfireOptions(new[]
    {
        HangfireSettings.DefaultQueueName,
        HangfireSettings.SettingQueueName
    }, "default", config.DefaultWorkersCount));

    services.AddHangfireServer(hangfireOptions(new[]
    {
        HangfireSettings.FullSyncListQueueName
    }, "full-sync", config.FullSyncWorkersCount));

    services.AddHangfireServer(hangfireOptions(new[]
    {
        HangfireSettings.DraftQueueName
    }, "draft", config.DraftSyncWorkersCount));

    services.AddHangfireServer(hangfireOptions(new[]
    {
        HangfireSettings.SyncMessageDetailsQueueName
    }, "message-details", config.FullSyncWorkersCount));

    services.AddHangfireServer(hangfireOptions(new[]
    {
        HangfireSettings.SyncMessageWatchDetailsQueueName
    }, "watch-details", config.WatchSyncWorkersCount));

    services.AddHangfireServer(hangfireOptions(new[]
    {
        HangfireSettings.HistoryQueueName
    }, "history-messages", config.HistorySyncWorkersCount));
}
knopa commented 2 weeks ago

If it doesn't help I will try to setup test project to reproduce that issue

odinserj commented 2 weeks ago

Unlikely these additional configuration options cause this behavior. It's more likely that the MonotonicTime class implementation that's used to ensure system clock drifts don't affect anything, doesn't handle some scenarios, due to differences between platforms (both .NET or OS). Unfortunately I wasn't able to replicate this behavior neither on Windows, nor on Ubuntu, so don't know what to do.

The only solution I see is to provide another option to use DateTime.UtcNow as an alternative time source for such scenarios.

odinserj commented 2 weeks ago

It is also strange that these recurring jobs are triggered by the recurring job schedule, since they are in the past and should be triggered. What does the stdump says about your threads?

knopa commented 2 weeks ago

As I see there is no real difference in MonotonicTime. I have rollback to 0.9.0 where it works fine. https://github.com/HangfireIO/Hangfire.InMemory/compare/v0.9.0...v0.10.3

I will revert to 0.10.3 on test env and will check stdump

knopa commented 2 weeks ago

So my test env shows next I deployed 0.10.3 yesterday 1:16 PM it works fine over 24 hours image

Today seems it broke 4 hours ago, so for proper testing need to wait for reproduce image

here is stdump output https://gist.github.com/knopa/ab2a0f1da8c9f8cec9e65dce9c9b67a7

odinserj commented 1 week ago

Thanks @knopa! I see recurring job scheduler is stuck while acquiring a lock, will investigate this and tell you when it's finished.

Managed stack trace:
   - [HelperMethodFrame] (System.Threading.Thread.SleepInternal) at System.Private.CoreLib.dll
   - System.Threading.Thread.Sleep(Int32) at System.Private.CoreLib.dll
   - System.Threading.SpinWait.SpinOnceCore(Int32) at System.Private.CoreLib.dll
   - Hangfire.InMemory.State.DispatcherBase`1[[System.Guid, System.Private.CoreLib]].TryAcquireLockEntry(Hangfire.Storage.JobStorageConnection, System.String, System.TimeSpan, Hangfire.InMemory.Entities.LockEntry`1<Hangfire.Storage.JobStorageConnection> ByRef) at Hangfire.InMemory.dll
   -  at 
   - Hangfire.Server.RecurringJobScheduler.Execute(Hangfire.Server.BackgroundProcessContext) at Hangfire.Core.dll
   - Hangfire.Server.BackgroundProcessDispatcherBuilder.ExecuteProcess(System.Guid, System.Object) at Hangfire.Core.dll
   - Hangfire.Processing.BackgroundExecution.Run(System.Action`2<System.Guid,System.Object>, System.Object) at Hangfire.Core.dll
   - Hangfire.Processing.BackgroundDispatcher.DispatchLoop() at Hangfire.Core.dll
   - [DebuggerU2MCatchHandlerFrame] at 
odinserj commented 1 week ago

By the way, do you see any exceptions related to Hangfire in your logs?

knopa commented 1 week ago

Nice catch

No errors related to Hangfire in Application Insights

odinserj commented 1 week ago

I will investigate the locking code – there's a race condition, and it's pretty difficult to catch it. I think the best we can do meanwhile is to enable DEBUG logging level for Hangfire and see if there are any exceptions originating from Hangfire.InMemory. May I ask you to do this?

knopa commented 1 week ago

Yes I will do on test env

odinserj commented 1 week ago

Reproduced! :fire::fire::fire:

knopa commented 1 week ago

Cool, It requires patient to catch it

odinserj commented 1 week ago

Hm, what .NET version you are using for your application? I was able to create a minimal reproduction for the problem, and get different results for .NET 6.0 (where everything is working fine), and in .NET 8.0 (where the problem occurs in my case).

The problem is gone on every platform, when we don't believe that ConcurrentDictionary.TryRemove always removes a key that's present in a collection (as per this issue on GitHub or this question on SO), and can return false under contention with concurrent GetOrAdd or TryAdd method calls.

I'm still thinking how to minimize the example even more to avoid Monitor.Wait and Monitor.Pulse method calls to make everything more obvious, but under debugger there's nothing special in the "event log" for entries exposed via Entry.Events property – only a single thread performs updates and other threads just call GetOrAdd and try to enter the first lock statement.

So in the following sample, I get no exceptions in .NET 6.0, and get "Failed to remove key" exception on .NET 8.0 despite still does exist and even was added from the same thread. Will continue investigation next week.

using System.Collections.Concurrent;
using System.Diagnostics;

var dictionary = new ConcurrentDictionary<string, Entry>();
var started = Stopwatch.StartNew();
var key = "hello";

while (true)
{
    var parallelism = Environment.ProcessorCount;

    Parallel.For(0, parallelism, new ParallelOptions { MaxDegreeOfParallelism = parallelism }, _ =>
    {
        Entry entry;

        // Acquiring...
        while (true)
        {
            entry = dictionary.GetOrAdd(key, static _ => new Entry());

            lock (entry)
            {
                if (entry.Finalized)
                {
                    continue;
                }

                entry.References++;

                entry.Trace("Reference added, performing wait...");
                while (entry.Acquired)
                {
                    Monitor.Wait(entry);
                }

                entry.Trace("Wait completed");

                entry.Acquired = true;
                break;
            }
        }

        // Releasing...
        lock (entry)
        {
            entry.Trace("Releasing...");
            entry.Acquired = false;
            Monitor.Pulse(entry);

            entry.Trace("Released, decrementing references...");
            entry.References--;

            if (entry.References == 0)
            {
                entry.Finalized = true;
                entry.Trace("Finalized, removing entry...");

                // The following implementation gives problems
                var result = dictionary.TryRemove(key, out var removed);
                entry.Trace("Entry removed");

                if (!result) { throw new Exception("Failed to remove key"); }
                if (!ReferenceEquals(removed, entry)) throw new Exception("Wrong entry");

                // But the following implementation works fine
                /*var wait = new SpinWait();
                while (true)
                {
                    var result = dictionary.TryRemove(key, out var removed);
                    if (result)
                    {
                        if (!ReferenceEquals(removed, entry)) throw new Exception("Wrong entry");
                        break;
                    }

                    if (dictionary.ContainsKey(key))
                    {
                        wait.SpinOnce();
                        continue;
                    }

                    throw new Exception("Failed to remove key");
                }*/
            }
        }
    });

    if (started.Elapsed > TimeSpan.FromSeconds(5))
    {
        Console.WriteLine($"{DateTime.Now}: still alive...");
        started = Stopwatch.StartNew();
    }
}

internal class Entry
{
    public int References;
    public bool Finalized;
    public ConcurrentQueue<string> Events = new ConcurrentQueue<string>();
    public bool Acquired;

    public void Trace(string message)
    {
        Events.Enqueue($"{message} (thread {Thread.CurrentThread.ManagedThreadId})");
    }
}
odinserj commented 1 week ago

And a simpler example:

using System.Collections.Concurrent;
using System.Diagnostics;

var dictionary = new ConcurrentDictionary<string, Entry>();
var started = Stopwatch.StartNew();
var key = "hello";

while (true)
{
    var parallelism = Environment.ProcessorCount;

    Parallel.For(0, parallelism, new ParallelOptions { MaxDegreeOfParallelism = parallelism }, i =>
    {
        Entry entry;
        while (true)
        {
            entry = dictionary.GetOrAdd(key, static _ => new Entry());

            lock (entry)
            {
                if (entry.Finalized) continue;
                while (entry.Acquired) Monitor.Wait(entry);
                if (entry.Finalized) continue;

                entry.Acquired = true;
                break;
            }
        }

        lock (entry)
        {
            entry.Acquired = false;
            entry.Finalized = true;

            var result = dictionary.TryRemove(key, out _);
            if (result == false) throw new InvalidOperationException("Removal failed");

            Monitor.PulseAll(entry);
        }
    });

    if (started.Elapsed > TimeSpan.FromSeconds(5))
    {
        Console.WriteLine($"{DateTime.Now}: still alive...");
        started = Stopwatch.StartNew();
    }
}

internal class Entry
{
    public bool Finalized;
    public bool Acquired;
}
knopa commented 1 week ago

We have .NET 8

That version failes in both 6 and 8. It defenetly means that issue just hidden by Monitor things, for example if you add 2 Console.Writeline after each GetOrAdd and TryRemove in your example it will works in 8 too. Also there is case with Remove you ignore when dictionary doesn't has a key.

using System.Collections.Concurrent;
using System.Diagnostics;

var dictionary = new ConcurrentDictionary<string, Entry>();
var started = Stopwatch.StartNew();
var key = "hello";

while (true)
{
    var parallelism = Environment.ProcessorCount;

    Parallel.For(0, parallelism, new ParallelOptions { MaxDegreeOfParallelism = parallelism }, i =>
    {
        Entry entry = dictionary.GetOrAdd(key, static _ => new Entry());

        var result = dictionary.TryRemove(key, out _);
        if (result == false)
        {
            //cause we could fail also when dictionary is empty
            if (dictionary.ContainsKey(key))
            {
                throw new InvalidOperationException("Removal failed");
            }
        }
    });

    if (started.Elapsed > TimeSpan.FromSeconds(5))
    {
        Console.WriteLine($"{DateTime.Now}: still alive...");
        started = Stopwatch.StartNew();
    }
}

If we try to implement real safe block, I will use library to safe efforts even not async version. That code works in 8 fine

using KeyedSemaphores;
using System.Collections.Concurrent;
using System.Diagnostics;

var dictionary = new ConcurrentDictionary<string, Entry>();
var collection = new KeyedSemaphoresDictionary<string>();
var started = Stopwatch.StartNew();
var key = "hello";

while (true)
{
    var parallelism = Environment.ProcessorCount;

    Parallel.For(0, parallelism, new ParallelOptions
    {
        MaxDegreeOfParallelism = parallelism
    },
    (i, token) =>
    {
        Entry entry;

        using (collection.Lock(key))
        {
            entry = dictionary.GetOrAdd(key, static _ => new Entry());
        }

        using (collection.Lock(key))
        {
            var result = dictionary.TryRemove(key, out _);
            if (result == false)
            {
                if (dictionary.ContainsKey(key))
                {
                    throw new InvalidOperationException("Removal failed");
                }
            }
        }
    });

    if (started.Elapsed > TimeSpan.FromSeconds(5))
    {
        Console.WriteLine($"{DateTime.Now}: still alive...");
        started = Stopwatch.StartNew();
    }
}

internal class Entry
{
    public bool Finalized;
    public bool Acquired;
}

Seems like as .net 8 has some perfomance improvements and we start receive that issue but I remember ConcurrentDictionarry are not real thread safe, so better to cover lock with more simple options and then use concurent dictionary. I hope it helps

odinserj commented 3 days ago

Sorry for the delayed response. Yesterday I submitted this problem as https://github.com/dotnet/runtime/issues/107525, and it was caused by PR https://github.com/dotnet/runtime/pull/82004 released with .NET 8.0 Preview 2. Today I will add a workaround for the problem by looping with TryRemove, until the problem is fixed in .NET itself.

odinserj commented 3 days ago

Seems like as .net 8 has some perfomance improvements and we start receive that issue but I remember ConcurrentDictionarry are not real thread safe, so better to cover lock with more simple options and then use concurent dictionary.

Actually it's thread safe in its internals, but some APIs like GetOrAdd(string, Func<String, T>) sometimes add confusion, since the Func itself isn't called in a lock, please see https://andrewlock.net/making-getoradd-on-concurrentdictionary-thread-safe-using-lazy/ for details. However, dictionary's internals are fully thread safe and follow the documented behavior, that's why I decided to check and re-check and submit possible issues to .NET project itself.

odinserj commented 3 days ago

Serhii, I have just released version 0.10.4, please try upgrading and let me know if you have any issues. Thank you so much for your assistance!

knopa commented 3 days ago

Thanks for fix. I will try it. Funny on test env issue with debug logs doesn't reproduced yet with 0.10.3. Looks like as i mention before delay with output to console hide problem.

I mean GetOrAdd(string, Func<String, T>) non atomic method. IMemoryCache has same issue.

odinserj commented 3 days ago

It's a pretty rare race condition, in my case it happened roughly once per million of lock acquisitions. You was just "lucky" enough to get this, and 6 servers just increased the probability of such an event. But this was a great report, since ConcurrentDictionary is a common type used in a lot of places and it's very important for this class to be bug-free.

Concurrency 🤷‍♂️

knopa commented 2 days ago

Just FYI Debug level doesn't show any issue before fail on 0.10.3. This time took 6 days to fail