HangfireIO / Hangfire

An easy way to perform background job processing in .NET and .NET Core applications. No Windows Service or separate process required
https://www.hangfire.io
Other
9.41k stars 1.71k forks source link

Canceling BackgroundJob #1592

Open Hulkstance opened 4 years ago

Hulkstance commented 4 years ago

I'm working on a Binance bot project and I decided to use Hangfire for each running bot thread. How do I cancel a specific BackgroundJob? RecurringJobs allow me to specify a job name while BackgroundJob doesn't.

private UpdateSubscription _subscription;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();

public void Run(Bot bot)
{
    BackgroundJob.Enqueue(() => Start(bot, _cts.Token));
}

public void Start(Bot bot, CancellationToken token)
{
    // heavy logic
    _subscription = _socketClient.SubscribeToKlineUpdates(bot.CryptoPair.Symbol, bot.TimeInterval.Interval /*KlineInterval.OneHour*/, async data =>
    {
        ... logic ...

        if (token.IsCancellationRequested)
        {
            await _socketClient.Unsubscribe(_subscription);
        }
    }
}

// Stop specific bot?
public void Stop(string botName)
{
    //_cts.Cancel();
}
burningice2866 commented 4 years ago

Actually, if you look closely at the signature of the Enqueue-method you'll see that it returns a string which represents the id of the job enqueued. You can use this id to later call BackgroundJob.Delete(jobId)

Hulkstance commented 4 years ago

@burningice2866, I just saw that too but it doesn't trigger the CancellationToken.

if (token.IsCancellationRequested)
{
    await _socketClient.Unsubscribe(_subscription);
}
burningice2866 commented 4 years ago

You can only delete a background job which havent yet been triggered. If its running, the only way to trigger the CancellationToken passed to the task is to stop Hangfire.

pieceofsummer commented 4 years ago

@Hulkstance the code you’re referring to is inside an async block which is probably not triggered before the cancellation occurs, so there’s no way that code is ever executed.

If SubscribeToKlineUpdates method blocks (which would be a terrible design), you need a callback mechanism to catch cancellation and abort, e.g.:

using (token.Register(() => _socketClient.Unsubscribe(_subscription)))
{
    _subscription = _socketClient.SubscribeToKlineUpdates(…, async data => {
        // …
    });
}

If it doesn’t block, then you’re exiting job method immediately so the job becomes completed before the async callback is ever called. Once the job is completed, the associated CancellationTokenSource is disposed and the token is never cancelled. In this case you need to actually wait for cancellation:

 _subscription = _socketClient.SubscribeToKlineUpdates(…, async data => {
    // …
});
token.WaitHandle.Wait(jobTimeout);
_socketClient.Unsubscribe(_subscription);

Also note that jobs are designed to be run for a finite amount of time. In your case you’d probably be better off with custom threads.

@burningice2866 running jobs are periodically checked for cancellation as well, and CancellationToken is triggered if a job was deleted.

burningice2866 commented 4 years ago

@pieceofsummer you're right, its even a long time ago it was implemented. I'm wrongly stuck in the old days before that behavior was implemented.

https://github.com/HangfireIO/Hangfire/pull/976

burningice2866 commented 4 years ago

Also note that jobs are designed to be run for a finite amount of time. In your case you’d probably be better off with custom threads.

Or let Hangfire manage a background thread by implementing IBackgroundProcess and add it to your ServiceCollection.

Hulkstance commented 4 years ago

@pieceofsummer, @burningice2866, I don't know what the cause was, so I migrated to tasks. With the following code I had an issue for a moment. The issue was that _tasks object was always null when I was calling the Stop method. Then I realized that my service was scoped, so I made the _tasks object static, so it remains as one instance.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Threads
{
    public interface IMyClass
    {
        void Start(string name);
        void Stop(string name);
    }

    public class MyClass : IMyClass
    {
        private readonly static List<Tuple<string, Task, CancellationTokenSource>> _tasks = new List<Tuple<string, Task, CancellationTokenSource>>();

        public void Start(string name)
        {
            CancellationTokenSource cts = new CancellationTokenSource();
            Task task = Task.Factory.StartNew(() => DoWork(name, cts.Token));
            _tasks.Add(new Tuple<string, Task, CancellationTokenSource>(name, task, cts));
        }

        public void Stop(string name)
        {
            foreach (var tuple in _tasks)
            {
                if (tuple.Item1.Contains(name))
                {
                    CancellationTokenSource cts = tuple.Item3;
                    cts.Cancel();
                }
            }
        }

        public void DoWork(string name, CancellationToken token)
        {
            try
            {
                while (true)
                {
                    Console.WriteLine($"{name} is working");
                    Thread.Sleep(1000);

                    if (token.IsCancellationRequested)
                    {
                        Console.WriteLine($"{name} canceled");
                        token.ThrowIfCancellationRequested();
                    }
                }
            }
            catch (OperationCanceledException ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            IMyClass mc = new MyClass();
            mc.Start("Bot 1");
            mc.Start("Bot 2");
            mc.Start("Bot 3");

            mc.Stop("Bot 2");

            Console.ReadKey();
        }
    }
}
uksreejith commented 4 years ago

Actually, if you look closely at the signature of the Enqueue-method you'll see that it returns a string which represents the id of the job enqueued. You can use this id to later call BackgroundJob.Delete(jobId)

This is what I am doing in my project. In the DB, the job is being marked as 'Deleted' but the background job does not stop :( . This is how my worker function looks like:

public async Task LiveTrackerWorker(TrackingServiceParam trackingParam, CancellationToken cancellationToken){
    //Infinite while loop with check for token cancellation
}

Could this be a bug?

pieceofsummer commented 4 years ago

@uksreejith what Hangfire version are you using? Cancelling deleted jobs is only available in 1.7.x branch.

Once again, jobs should be finite, otherwise they would fail/restart because of job invisibility timeouts. Use background processes for infinite loops.

uksreejith commented 4 years ago

Hi @pieceofsummer, I am using version 1.7.8. I replaced the CancellationToken with IJobCancellationToken and it worked. For some reason using CancellationToken was not working when deployed onto Azure App Service but was working on my local machine. In my use case, the worker process is supposed to run till it gets a signal to stop. (Usually within 45 to 90 min). Also, I could not find any documentation for Background Process. Does it have a separate wiki for that?

image