HangfireIO / Hangfire

An easy way to perform background job processing in .NET and .NET Core applications. No Windows Service or separate process required
https://www.hangfire.io
Other
9.41k stars 1.71k forks source link

Weird problem when running two methods inside the Enqueue #2017

Open luizfbicalho opened 2 years ago

luizfbicalho commented 2 years ago

I have an extension to enqueue my view models pointing to an implementation of an interface IBackgroundJob

this are my extensions methods

private static readonly ActivitySource activitySource = new("MC.Hangfire.Extensions");

public static string Enqueue<T>(this T job, IBackgroundJobClient client)
{
    return client.Enqueue<IBackgroundJob<T>>(ps => ps.AddTelemetry(null).EnqueueJob(null, job, JobCancellationToken.Null));
}

public static IBackgroundJob<T> AddTelemetry<T>(this IBackgroundJob<T> job, PerformContext context)
{
    using var activity = activitySource.StartActivity($"Start Job {typeof(T).FullName} id {context.BackgroundJob.Id}", ActivityKind.Server);
    activity?.SetTag("JobId", context.BackgroundJob.Id);
    activity?.SetTag("JobJson", Newtonsoft.Json.JsonConvert.SerializeObject(job));
    activity?.SetTag("Job", Newtonsoft.Json.JsonConvert.SerializeObject(context.BackgroundJob.Job));
    return job;
}

My problem is that the EnqueueJob method is called, but the AddTelemetry method is not called before, how can I Add the telemetry information before calling all of my jobs, but in the context of the jobs, and of course not adding this code in all of my enqueue methods?

EDIT:

I'll try to implement this with filters, not sure if it's the best way

luizfbicalho commented 2 years ago

I copied this filter and did this filter

public class TraceLogFilterAttribute : JobFilterAttribute,
IClientFilter, IServerFilter, IElectStateFilter, IApplyStateFilter
{
    public TraceLogFilterAttribute()
    {

    }

    private static readonly ILog Logger = LogProvider.GetCurrentClassLogger();

    public void OnCreating(CreatingContext context)
    {
        Logger.InfoFormat("Creating a job based on method `{0}`...", context.Job.Method.Name);
    }

    public void OnCreated(CreatedContext context)
    {
        Logger.InfoFormat(
            "Job that is based on method `{0}` has been created with id `{1}`",
            context.Job.Method.Name,
            context.BackgroundJob?.Id);
    }
    private static readonly ActivitySource activitySource = new("MC.Hangfire.Extensions");

    public void OnPerforming(PerformingContext context)
    {
        using var activity = activitySource.StartActivity($"Start Job {context.BackgroundJob.Job.Type.Name}.{context.BackgroundJob.Job.Method.Name}", ActivityKind.Server);
        activity?.SetTag("JobId", context.BackgroundJob.Id);
        activity?.SetTag("JobType", context.BackgroundJob.Job.Type.FullName);
        activity?.SetTag("JobMethod", context.BackgroundJob.Job.Method.Name);
        activity?.SetTag("JobMethod", JsonConvert.SerializeObject(context.BackgroundJob.Job.Args, new JsonSerializerSettings() { ReferenceLoopHandling = ReferenceLoopHandling.Ignore }));
        Logger.InfoFormat("Starting to perform job `{0}`", context.BackgroundJob.Id);
    }

    public void OnPerformed(PerformedContext context)
    {
        Logger.InfoFormat("Job `{0}` has been performed", context.BackgroundJob.Id);
    }

    public void OnStateElection(ElectStateContext context)
    {
        var failedState = context.CandidateState as FailedState;
        if (failedState != null)
        {
            Logger.WarnFormat(
                "Job `{0}` has been failed due to an exception `{1}`",
                context.BackgroundJob.Id,
                failedState.Exception);
        }
    }

    public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
    {
        Logger.InfoFormat(
            "Job `{0}` state was changed from `{1}` to `{2}`",
            context.BackgroundJob.Id,
            context.OldStateName,
            context.NewState.Name);
    }

    public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
    {
        Logger.InfoFormat(
            "Job `{0}` state `{1}` was unapplied.",
            context.BackgroundJob.Id,
            context.OldStateName);
    }
}

Now I have some issues about this filter

1) The filter should be Injected by DI, I could just create it as a singleton to the hangfire global filters. 2) The activity started doesn't flow to the method enqueued, this way I cant measure what happened in my instrumentation correctly 3) I used the LogProvider.GetCurrentClassLogger(); but my idea was to use he ASP.NET core Log here also.

sgryphon commented 4 weeks ago

using var activity = activitySource.StartActivity($"Start Job {context.BackgroundJob.Job.Type.Name}.{context.BackgroundJob.Job.Method.Name}", ActivityKind.Server);

The using will Dispose of the Activity when the handler ends, which will Stop the activity (end it, and revert to parent, which will be empty). So while the first log message will have the trace, anything inside the actual job will not.

Not that you don't need to keep a reference, as Activity.Current will keep a reference for you. (And you can't anyway, as Hangfire serialises the reference data and doesn't pass the actual object)

You can then clean up in OnPerformed by getting Activity.Current and stopping it. You should probably check that the item you are closing matches the one you opened, but if it doesn't it not clear what else to do (check if part of the same trace, and walk up the parent chain? I don't know -- at that point it would be better to be in core Hangfire and close the right reference).

Ideally, this should be part of the main Hangfire code (built in, not as a Filter). Maybe one day I will look at making it a proper pull request (too many other open source projects at the moment).

/// <summary>
/// Start a trace (Activity) for each Hangfire job. Need to configure a listener for 'Hangfire.Server', e.g. OpenTelemetry.
/// </summary>
public class StartActivityHangfireServerFilter : IServerFilter
{
    public const string ActivitySourceName = "Hangfire.Server";

    private readonly ActivitySource _activitySource = new ActivitySource(ActivitySourceName);

    public void OnPerforming(PerformingContext context)
    {
        var activity = _activitySource.StartActivity(
            $"perform {context.BackgroundJob.Job}",
            ActivityKind.Consumer
        );
        activity?.SetTag("job_name", context.BackgroundJob.Job.ToString());
        activity?.SetTag("job_id", context.BackgroundJob.Id);
        context.SetJobParameter("activity.trace_id", activity?.TraceId.ToString());
    }

    public void OnPerformed(PerformedContext context)
    {
        var activity = Activity.Current;
        if (activity?.Status == ActivityStatusCode.Unset)
        {
            activity?.SetStatus(
                context.Exception == null ? ActivityStatusCode.Ok : ActivityStatusCode.Error,
                context.Exception?.Message
            );
        }
        activity?.Stop();
    }
}
sgryphon commented 1 week ago

I've also put up a PR to include Activity creation in core Hangfier, so distributed trace context is automatically transferred from job creation to job processing, i.e. job processing will always have an activity. https://github.com/HangfireIO/Hangfire/pull/2460