temporalio / sdk-go

Temporal Go SDK
https://docs.temporal.io/application-development?lang=go
MIT License
532 stars 210 forks source link

Use interceptor logger for all logs related to a workflow or activity execution #829

Open jlegrone opened 2 years ago

jlegrone commented 2 years ago

Is your feature request related to a problem? Please describe.

Interceptors are already able to modify log fields emitted by workflow and activity code. But any lower-level logs emitted by the Go SDK don't respect the logger returned by the interceptor chain. These include debug level logs like "ExecuteActivity" as well as warning or error level logs like "Task processing failed with error".

Using the logger returned by the interceptor chain would allow users to filter all logs related to their workflow executions by domain-specific fields, create more sophisticated monitors or metric generation pipelines, and correlate logs to traces on observability platforms where that is supported.

Describe the solution you'd like

Evaluate the worker's interceptor chain to get a workflow or activity logger whenever a low-level log needs to be emitted by the Go SDK.

Describe alternatives you've considered

It might be possible to at least handle all debug level logs like "ExecuteActivity" via a regular interceptor that is registered by default.

It would be interesting to look into implementing low level logs like "Task processing failed with error" via an interceptor as well, perhaps by extending the workflow/activity interceptor interfaces to include a method for recording SDK errors. This would bring some additional benefits since interceptors would not only be able to log those errors, but emit custom metrics or error trace spans as well.

Additional context

I wrote an e2e test to demonstrate. After running the test, observe that the trace_id log field is not included in the "ExecuteActivity" log line.

package sdklogtest

import (
    "context"
    "testing"
    "time"

    "github.com/DataDog/temporalite/temporaltest"
    "go.temporal.io/sdk/activity"
    "go.temporal.io/sdk/client"
    "go.temporal.io/sdk/contrib/opentracing"
    "go.temporal.io/sdk/interceptor"
    "go.temporal.io/sdk/log"
    "go.temporal.io/sdk/worker"
    "go.temporal.io/sdk/workflow"
)

func LogWorkflow(ctx workflow.Context) error {
    workflow.GetLogger(ctx).Info("hello workflow")
    return workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
        ScheduleToCloseTimeout: time.Minute,
    }), LogActivity).Get(ctx, nil)
}

func LogActivity(ctx context.Context) error {
    activity.GetLogger(ctx).Info("hello activity")
    return nil
}

type loggingTracer struct {
    interceptor.Tracer
}

func (loggingTracer) GetLogger(logger log.Logger, ref interceptor.TracerSpanRef) log.Logger {
    return log.With(logger, "trace_id", "0112358")
}

func TestLogOutput(t *testing.T) {
    // Which tracer we use here isn't important to the test; pulling in the opentracing
    // implementation just meant I didn't have to mock a full interceptor.
    openTracer, err := opentracing.NewTracer(opentracing.TracerOptions{})
    if err != nil {
        t.Fatal(err)
    }

    // Create test Temporal server and client
    ts := temporaltest.NewServer(temporaltest.WithT(t))
    c := ts.NewClientWithOptions(client.Options{
        Interceptors: []interceptor.ClientInterceptor{
            interceptor.NewTracingInterceptor(loggingTracer{openTracer}),
        },
    })

    // Register a new worker
    w := worker.New(c, "test", worker.Options{})
    defer w.Stop()
    w.RegisterWorkflow(LogWorkflow)
    w.RegisterActivity(LogActivity)
    if err := w.Start(); err != nil {
        t.Fatal(err)
    }

    // Start a workflow that emits some logs
    wfr, err := c.ExecuteWorkflow(context.Background(), client.StartWorkflowOptions{
        TaskQueue:                "test",
        WorkflowExecutionTimeout: time.Second * 10,
    }, LogWorkflow)
    if err != nil {
        t.Fatal(err)
    }

    // Wait for workflow to complete and fail test if workflow errors.
    if err := wfr.Get(context.Background(), nil); err != nil {
        t.Fatal(err)
    }

    // There's not a way to write a test against worker log output at the moment,
    // so just fail the test so logs can be inspected by a human.
    t.Errorf("Please inspect test log output for tag `trace_id:0112358` in workflow execution logs")
}
cretz commented 2 years ago

The problem here is that the interceptor is for intercepting literal calls to activity.GetLogger and workflow.GetLogger, not just all loggers. You can provide your own logger at the client level to intercept logs originating outside those explicit user calls.

But you may need the context. We can do what we did with data converters. Any data converter that implements the following interface:

type ContextAware interface {
    WithWorkflowContext(ctx Context) converter.DataConverter
    WithContext(ctx context.Context) converter.DataConverter
}

Will have those functions called before the converter is used. So maybe we can make a:

type ContextAwareLogger interface {
    WithWorkflowContext(ctx Context) log.Logger
    WithContext(ctx context.Context) log.Logger
}

And invoke those when impl'd by the logger everywhere we have a context. The overhead should be negligible. Thoughts?

jlegrone commented 2 years ago

A ContextAwareLogger interface sounds like it could be useful... but when the SDK already has a workflow or activity context available to pass into the client's default logger, why not call workflow.GetLogger or activity.GetLogger instead so that the interceptor chain is invoked as well?

cretz commented 2 years ago

My assumption was that GetLogger was as a result of a literal GetLogger call, and not just before every logger use to make sure it has the latest context info. However, it might make sense to just do this and we need to just be clear that unlike other outbound interceptors, this is not 1:1 with user calls, it is more frequent and therefore needs to be careful to remain performant.

cretz commented 2 years ago

After some thought, I am afraid of repeatedly calling GetLogger internally instead of its original purpose of only being invoked when the user invokes it. I am worried about going from people understanding that it's only called per user call, to now dozens of internal calls since the context can change many times throughout the life of a workflow/activity and one may want such context updates before each log statement.

I am back to considering the ContextAwareLogger approach.

jlegrone commented 1 year ago

I'd love to revisit this issue. We're looking for ways to assess the scope of impact for incidents caused by workflow code panics and other non-versioned/deterministic changes.

A common failure mode we've observed is that a small number of top level requests will generate a large number of Go SDK generated error logs across a broad set of child workflows. We're looking for a way to quickly identify how many root/top level parent workflows are impacted by aggregating across this set of logs, but are limited by the fact that we have no way of injecting custom request-scoped attributes in Go SDK logs.

For example, aggregating workflow error logs by dd.trace_id would allow us to identify how many top requests are impacted and help operators make more informed decisions about whether these top level workflows should be terminated vs. rolling out a new worker version.