open-telemetry / opentelemetry-go

OpenTelemetry Go API and SDK
https://opentelemetry.io/docs/languages/go
Apache License 2.0
5.29k stars 1.08k forks source link

Metric creation slowed down by unreachable collector with gRPC #3925

Closed fracasula closed 1 year ago

fracasula commented 1 year ago

Description

I noticed that the creation of instruments while the agent/collector is down can easily take more than 10 seconds each. I'm testing this by passing an invalid host (e.g. unreachable:4317) to my application and I see these calls taking more than 10 seconds each:

switch any(m).(type) {
case map[string]instrument.Int64Counter:
    value, err = meter.Int64Counter(name, castOptions[instrument.Int64Option](opts...)...)
case map[string]instrument.Int64Histogram:
    value, err = meter.Int64Histogram(name, castOptions[instrument.Int64Option](opts...)...)
case map[string]instrument.Float64Histogram:
    value, err = meter.Float64Histogram(name, castOptions[instrument.Float64Option](opts...)...)
default:
    panic(fmt.Errorf("unknown instrument type %T", instr))
}

That is a small switch that I have in my own instruments factory that works as an adapter to your library.

image

To me it looks like these operations all hold a mutex (i.e. e.clientMu):

image

I think Aggregation and Temporality are called when an instrument is created but the same mutex is kept locked for the entire duration of Export as well, so maybe that is where the problem lies. I would expect Export to hold a lock just to create a copy of the data that it needs to send, then unlock then asynchronously take its time to send the data over the gRPC connection. Buffered channels can be used to and metrics can be discarded once the buffer is full.

Environment

Steps To Reproduce

You don't even need a collector. Just create many go routines spawning instruments (also the same instrument over and over since it should be cached in memory) and pass something like "unreachable:4317" as the gRPC endpoint.

This is my default retry config:

var DefaultRetryConfig = RetryConfig{
    Enabled:         true,
    InitialInterval: 5 * time.Second,
    MaxInterval:     30 * time.Second,
    MaxElapsedTime:  time.Minute,
}

And this is how I initialize the meter provider:

meterProviderOptions := []otlpmetricgrpc.Option{
    otlpmetricgrpc.WithEndpoint(c.metricsEndpoint),
    otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetryConfig{
        Enabled:         c.retryConfig.Enabled,
        InitialInterval: c.retryConfig.InitialInterval,
        MaxInterval:     c.retryConfig.MaxInterval,
        MaxElapsedTime:  c.retryConfig.MaxElapsedTime,
    }),
}
if c.withInsecure {
    meterProviderOptions = append(meterProviderOptions, otlpmetricgrpc.WithInsecure())
}
if len(c.meterProviderConfig.otlpMetricGRPCOptions) > 0 {
    meterProviderOptions = append(meterProviderOptions, c.meterProviderConfig.otlpMetricGRPCOptions...)
}
exp, err := otlpmetricgrpc.New(ctx, meterProviderOptions...)
if err != nil {
    return nil, nil, fmt.Errorf("failed to create metric exporter: %w", err)
}

m.mp = sdkmetric.NewMeterProvider(
    sdkmetric.WithResource(res),
    sdkmetric.WithReader(sdkmetric.NewPeriodicReader(
        exp,
        sdkmetric.WithInterval(c.meterProviderConfig.exportsInterval),
    )),
    sdkmetric.WithView(c.meterProviderConfig.views...),
)

if c.meterProviderConfig.global {
    global.SetMeterProvider(m.mp)
}

Expected behavior

I would expect the creation of instruments not to take more than a few milliseconds at most.

fracasula commented 1 year ago

Removing the lock from Aggregation and Temporality solve the issue for me:

// Temporality returns the Temporality to use for an instrument kind.
func (e *exporter) Temporality(k metric.InstrumentKind) metricdata.Temporality {
    start := time.Now()
    defer func() {
        fmt.Println("OTEL: exporter.Temporality took", time.Since(start))
    }()
    //e.clientMu.Lock()
    //defer e.clientMu.Unlock()
    return e.client.Temporality(k)
}

// Aggregation returns the Aggregation to use for an instrument kind.
func (e *exporter) Aggregation(k metric.InstrumentKind) aggregation.Aggregation {
    start := time.Now()
    defer func() {
        fmt.Println("OTEL: exporter.Aggregation took", time.Since(start))
    }()
    //e.clientMu.Lock()
    //defer e.clientMu.Unlock()
    return e.client.Aggregation(k)
}

Don't mind the fmt.Println, it was just to debug. Again, with the clientMu.Lock it takes 20-30s.

Anyway, given that the temporality and aggregation selectors aren't really using the client, is the sequentiality that you're trying to achieve via the lock needed there?

fracasula commented 1 year ago

@MrAlias can you tell me the reason why this comment was added?

// Ensure synchronous access to the client across all functionality.

Perhaps if I understand the rationale behind that I could try to fix it myself. It's a serious issue so we can't go to production with it.

The problem is that I'm not sure that removing the lock for Aggregation and Temporality is the best way to go. It might not cause issues right now because at the moment both methods are just returning copies across the stack but I don't know what are your plans for that component moving forward.

pellared commented 1 year ago

I think that synchronization is required to avoid races when doing ForceFlush and Shudown of the exporter.

fracasula commented 1 year ago

I think that synchronization is required to avoid races when doing ForceFlush and Shudown of the exporter.

Right, but that doesn't affect Temporality and Aggregation though, correct? Unless there is a plan to have them communicate via gRPC as well.

At the moment I see these two being used for Temporality and Aggregation:

pellared commented 1 year ago

I see that the lock is currently needed to avoid a race as the Shutdown mutates e.client. Do you want to contribute by refining the code so that locking is not needed for Temporality and Aggregation?

EDIT: Personally I would simply use a isShutdown atomic.Bool instead of mutating the e.client 😅

fracasula commented 1 year ago

I see that the lock is currently needed to avoid a race as the Shutdown mutates e.client. Do you want to contribute by refining the code so that locking is not needed for Temporality and Aggregation?

If there are no plans to get such information via the gRPC connection (for some reason) then yeah I can try to have a stab at it.

I'd like confirmation from @MrAlias before commencing any work though since apparently he wrote that bit.

MrAlias commented 1 year ago

It's been a while since I looked at this code, but, if I recall correctly, the client lock was included to ensure synchronous access to all client methods so the http and grpc clients didn't have to manage concurrency. This was before the temporality and aggregation selection was added to the reader.

As long as client implementations are updated to ensure they are concurrent safe and the coordination with the client field changing is handled. I don't see why this couldn't be updated.

fracasula commented 1 year ago

What do you think about having a leaner client and pass a ConfigSelector along with the client to get temporality and aggregation selectors? The ConfigSelector wouldn't need any synchronization since it's just getting configuration variables.

Both the aggregation and temporality selectors are already coming from Config here and here.

Logically speaking the separation is already there. Instead of Client we could have:

// Client handles the transmission of OTLP data to an OTLP receiving endpoint.
type Client interface {
    // UploadMetrics transmits metric data to an OTLP receiver.
    //
    // All retry logic must be handled by UploadMetrics alone, the Exporter
    // does not implement any retry logic. All returned errors are considered
    // unrecoverable.
    UploadMetrics(context.Context, *mpb.ResourceMetrics) error

    // ForceFlush flushes any metric data held by an Client.
    //
    // The deadline or cancellation of the passed context must be honored. An
    // appropriate error should be returned in these situations.
    ForceFlush(context.Context) error

    // Shutdown flushes all metric data held by a Client and closes any
    // connections it holds open.
    //
    // The deadline or cancellation of the passed context must be honored. An
    // appropriate error should be returned in these situations.
    //
    // Shutdown will only be called once by the Exporter. Once a return value
    // is received by the Exporter from Shutdown the Client will not be used
    // anymore. Therefore all computational resources need to be released
    // after this is called so the Client can be garbage collected.
    Shutdown(context.Context) error
}

type ConfigSelector interface {
    // Temporality returns the Temporality to use for an instrument kind.
    Temporality(metric.InstrumentKind) metricdata.Temporality

    // Aggregation returns the Aggregation to use for an instrument kind.
    Aggregation(metric.InstrumentKind) aggregation.Aggregation
}

And then in exporter.go:

// exporter exports metrics data as OTLP.
type exporter struct {
    // Ensure synchronous access to the client across all functionality.
    clientMu       sync.Mutex
    client         Client
    configSelector ConfigSelector

    shutdownOnce sync.Once
}

Here's a draft PR.