micrometer-metrics / micrometer

An application observability facade for the most popular observability tools. Think SLF4J, but for observability.
https://micrometer.io
Apache License 2.0
4.39k stars 966 forks source link

ClassicHistogramBuckets creating failure #5223

Closed VladimirZaitsev21 closed 2 weeks ago

VladimirZaitsev21 commented 2 weeks ago

Describe the bug We use io.micrometer.core.instrument.Timer and build a histogram based on it. Sometimes an IllegalArgumentException with message "Counts in ClassicHistogramBuckets cannot be negative." occurs. We have redefined the PrometheusMeterRegistry with our LoggingPrometheusMeterRegistry to get additional information about the data on which this exception occurs. Image below shows the problem. image

We logged a Histogram Snapshot and got the following:

 HistogramSnapshot{
    count=575660,
    total=1.78243584339E11,
    mean=309633.43699232186,
    max=1.2855223E7,
    histogramCounts=[
        (570903.0 at 5000000.0),
        (571035.0 at 5592405.0),
        (571288.0 at 6990506.0),
        (571461.0 at 8388607.0),
        (571631.0 at 9786708.0),
        (571646.0 at 1.0E7),
        (571768.0 at 1.1184809E7),
        (571945.0 at 1.258291E7),
        (572155.0 at 1.3981011E7),
        (572335.0 at 1.5379112E7),
        (572500.0 at 1.6777216E7),
        (572796.0 at 2.2369621E7),
        (572893.0 at 2.7962026E7),
        (572918.0 at 3.0E7),
        (572954.0 at 3.3554431E7),
        (573008.0 at 3.9146836E7),
        (573902.0 at 4.4739241E7),
        (574844.0 at 5.0E7),
        (574861.0 at 5.0331646E7),
        (575269.0 at 5.5924051E7),
        (575297.0 at 6.1516456E7),
        (575324.0 at 6.7108864E7),
        (575483.0 at 8.9478485E7),
        (575569.0 at 1.0E8),
        (575616.0 at 1.11848106E8),
        (575634.0 at 1.34217727E8),
        (575642.0 at 1.56587348E8),
        (575653.0 at 1.78956969E8),
        (575660.0 at 2.0132659E8),
        (575660.0 at 2.23696211E8),
        (575660.0 at 2.46065832E8),
        (575660.0 at 2.68435456E8),
        (575660.0 at 3.57913941E8),
        (575661.0 at 4.47392426E8),
        (575661.0 at 5.36870911E8),
        (575661.0 at 6.26349396E8),
        (575661.0 at 7.15827881E8),
        (575661.0 at 8.05306366E8),
        (575661.0 at 8.94784851E8),
        (575661.0 at 9.84263336E8),
        (575661.0 at 1.073741824E9),
        (575661.0 at 1.431655765E9),
        (575661.0 at 1.789569706E9),
        (575661.0 at 2.147483647E9),
        (575661.0 at 2.505397588E9),
        (575661.0 at 2.863311529E9),
        (575661.0 at 3.22122547E9),
        (575661.0 at 3.579139411E9),
        (575661.0 at 3.937053352E9),
        (575661.0 at 4.294967296E9),
        (575661.0 at 5.0E9)
    ]
}

If you try to create a Timer using such a snapshot, the described exception will be thrown.

Environment

To Reproduce LoggingPrometheusMeterRegistry class to reproduce the bug is shown below:

package io.micrometer.prometheusmetrics

import ClassicHistogramBucketsException
import io.micrometer.core.instrument.Clock
import io.micrometer.core.instrument.LongTaskTimer
import io.micrometer.core.instrument.Meter
import io.micrometer.core.instrument.Tag
import io.micrometer.core.instrument.Timer
import io.micrometer.core.instrument.distribution.CountAtBucket
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig
import io.micrometer.core.instrument.distribution.HistogramSupport
import io.micrometer.core.instrument.distribution.pause.PauseDetector
import io.micrometer.core.instrument.internal.DefaultLongTaskTimer
import io.micrometer.core.instrument.util.TimeUtils
import io.micrometer.prometheusmetrics.MicrometerCollector.Family
import io.prometheus.metrics.model.registry.PrometheusRegistry
import io.prometheus.metrics.model.snapshots.ClassicHistogramBuckets
import io.prometheus.metrics.model.snapshots.Exemplar
import io.prometheus.metrics.model.snapshots.Exemplars
import io.prometheus.metrics.model.snapshots.GaugeSnapshot
import io.prometheus.metrics.model.snapshots.GaugeSnapshot.GaugeDataPointSnapshot
import io.prometheus.metrics.model.snapshots.HistogramSnapshot
import io.prometheus.metrics.model.snapshots.HistogramSnapshot.HistogramDataPointSnapshot
import io.prometheus.metrics.model.snapshots.Labels
import io.prometheus.metrics.model.snapshots.MetricMetadata
import io.prometheus.metrics.model.snapshots.Quantile
import io.prometheus.metrics.model.snapshots.Quantiles
import io.prometheus.metrics.model.snapshots.SummarySnapshot
import io.prometheus.metrics.model.snapshots.SummarySnapshot.SummaryDataPointSnapshot
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.TimeUnit
import java.util.function.BiConsumer
import java.util.function.Consumer
import java.util.function.Supplier
import java.util.stream.Collectors
import java.util.stream.Stream
import java.util.stream.StreamSupport

class LoggingPrometheusMeterRegistry(
    private val prometheusConfig: PrometheusConfig,
    private val registry: PrometheusRegistry,
) : PrometheusMeterRegistry(prometheusConfig, registry, Clock.SYSTEM) {
    private val exemplarSamplerFactory = null
    internal val collectorMap: ConcurrentMap<String, MicrometerCollector> = ConcurrentHashMap()

    public override fun newTimer(
        id: Meter.Id,
        distributionStatisticConfig: DistributionStatisticConfig,
        pauseDetector: PauseDetector,
    ): Timer {
        val timer = PrometheusTimer(id, clock, distributionStatisticConfig, pauseDetector, exemplarSamplerFactory)
        applyToCollector(id) { collector ->
            addDistributionStatisticSamples(id, collector, timer, timer::exemplars, tagValues(id), false)
        }
        return timer
    }

    public override fun newLongTaskTimer(
        id: Meter.Id,
        distributionStatisticConfig: DistributionStatisticConfig,
    ): LongTaskTimer {
        val ltt: LongTaskTimer = DefaultLongTaskTimer(id, clock, baseTimeUnit, distributionStatisticConfig, true)
        applyToCollector(id) { collector ->
            addDistributionStatisticSamples(id, collector, ltt, { Exemplars.EMPTY }, tagValues(id), true)
        }
        return ltt
    }

    private fun tagValues(id: Meter.Id): List<String> = StreamSupport
        .stream(id.tagsAsIterable.spliterator(), false)
        .map { obj: Tag -> obj.value }
        .collect(Collectors.toList())

    private fun applyToCollector(id: Meter.Id, consumer: Consumer<MicrometerCollector>) {
        collectorMap.compute(getConventionName(id)) { name: String?, existingCollector: MicrometerCollector? ->
            if (existingCollector == null) {
                val micrometerCollector = MicrometerCollector(
                    name!!,
                    id,
                    config().namingConvention(),
                )
                consumer.accept(micrometerCollector)
                registry.register(micrometerCollector)
                return@compute micrometerCollector
            }
            val tagKeys = getConventionTags(id)
                .stream()
                .map { obj: Tag -> obj.key }
                .collect(Collectors.toList())
            if (existingCollector.tagKeys == tagKeys) {
                consumer.accept(existingCollector)
                return@compute existingCollector
            }

            meterRegistrationFailed(
                id,
                "Prometheus requires that all meters with the same name have the same" +
                    " set of tag keys. There is already an existing meter named '" + id.name +
                    "' containing tag keys [" +
                    java.lang.String.join(", ", collectorMap[getConventionName(id)]!!.tagKeys) +
                    "]. The meter you are attempting to register" + " has keys [" +
                    getConventionTags(id)
                        .stream()
                        .map { obj: Tag -> obj.key }
                        .collect(Collectors.joining(", ")) + "].",
            )
            existingCollector
        }
    }

    @Suppress("LongMethod", "TooGenericExceptionCaught")
     internal fun addDistributionStatisticSamples(
        id: Meter.Id,
        collector: MicrometerCollector,
        histogramSupport: HistogramSupport,
        exemplarsSupplier: Supplier<Exemplars>,
        tagValues: List<String>,
        forLongTaskTimer: Boolean,
    ) {
        collector.add(tagValues) { conventionName: String, tagKeys: List<String?>? ->
            val families = Stream.builder<Family<*>>()
            val histogramSnapshot = io.micrometer.core.instrument.distribution.HistogramSnapshot(
                575660,
                1.78243584339E11,
                1.2855223E7,
                null,
                arrayOf(
                   CountAtBucket(5000000.0, 570903.0),
                   CountAtBucket(5592405.0, 571035.0),
                   CountAtBucket(6990506.0, 571288.0),
                   CountAtBucket(8388607.0, 571461.0),
                   CountAtBucket(9786708.0, 571631.0),
                   CountAtBucket(1.0E7, 571646.0),
                   CountAtBucket(1.1184809E7, 571768.0),
                   CountAtBucket(1.258291E7, 571945.0),
                   CountAtBucket(1.3981011E7, 572155.0),
                   CountAtBucket(1.5379112E7, 572335.0),
                   CountAtBucket(1.6777216E7, 572500.0),
                   CountAtBucket(2.2369621E7, 572796.0),
                   CountAtBucket(2.7962026E7, 572893.0),
                   CountAtBucket(3.0E7, 572918.0),
                   CountAtBucket(3.3554431E7, 572954.0),
                   CountAtBucket(3.9146836E7, 573008.0),
                   CountAtBucket(4.4739241E7, 573902.0),
                   CountAtBucket(5.0E7, 574844.0),
                   CountAtBucket(5.0331646E7, 574861.0),
                   CountAtBucket(5.5924051E7, 575269.0),
                   CountAtBucket(6.1516456E7, 575297.0),
                   CountAtBucket(6.7108864E7, 575324.0),
                   CountAtBucket(8.9478485E7, 575483.0),
                   CountAtBucket(1.0E8, 575569.0),
                   CountAtBucket(1.11848106E8, 575616.0),
                   CountAtBucket(1.34217727E8, 575634.0),
                   CountAtBucket(1.56587348E8, 575642.0),
                   CountAtBucket(1.78956969E8, 575653.0),
                   CountAtBucket(2.0132659E8, 575660.0),
                   CountAtBucket(2.23696211E8, 575660.0),
                   CountAtBucket(2.46065832E8, 575660.0),
                   CountAtBucket(2.68435456E8, 575660.0),
                   CountAtBucket(3.57913941E8, 575660.0),
                   CountAtBucket(4.47392426E8, 575661.0),
                   CountAtBucket(5.36870911E8, 575661.0),
                   CountAtBucket(6.26349396E8, 575661.0),
                   CountAtBucket(7.15827881E8, 575661.0),
                   CountAtBucket(8.05306366E8, 575661.0),
                   CountAtBucket(8.94784851E8, 575661.0),
                   CountAtBucket(9.84263336E8, 575661.0),
                   CountAtBucket(1.073741824E9, 575661.0),
                   CountAtBucket(1.431655765E9, 575661.0),
                   CountAtBucket(1.789569706E9, 575661.0),
                   CountAtBucket(2.147483647E9, 575661.0),
                   CountAtBucket(2.505397588E9, 575661.0),
                   CountAtBucket(2.863311529E9, 575661.0),
                   CountAtBucket(3.22122547E9, 575661.0),
                   CountAtBucket(3.579139411E9, 575661.0),
                   CountAtBucket(3.937053352E9, 575661.0),
                   CountAtBucket(4.294967296E9, 575661.0),
                   CountAtBucket(5.0E9, 575661.0),
                ),
                BiConsumer { t, u ->  }
            )
//                histogramSupport.takeSnapshot()
            val percentileValues = histogramSnapshot.percentileValues()
            val histogramCounts = histogramSnapshot.histogramCounts()
            val count = histogramSnapshot.count()
            val sum = histogramSnapshot.total(baseTimeUnit)

            if (histogramCounts.size == 0) {
                var quantiles =
                    Quantiles.EMPTY
                if (percentileValues.size > 0) {
                    val quantileList: MutableList<Quantile> =
                        ArrayList()
                    for (v in percentileValues) {
                        quantileList.add(
                            Quantile(
                                v.percentile(),
                                v.value(baseTimeUnit),
                            ),
                        )
                    }
                    quantiles = Quantiles.of(quantileList)
                }

                val exemplars = createExemplarsWithScaledValues(exemplarsSupplier.get())
                families.add(
                    Family(
                        conventionName,
                        { family: Family<SummaryDataPointSnapshot?> ->
                            SummarySnapshot(
                                family.metadata,
                                family.dataPointSnapshots,
                            )
                        },
                        getMetadata(id),
                        SummaryDataPointSnapshot(
                            count,
                            sum,
                            quantiles,
                            Labels.of(tagKeys, tagValues),
                            exemplars,
                            0,
                        ),
                    ),
                )
            } else {
                val buckets: MutableList<Double> = ArrayList()
                val counts: MutableList<Number> = ArrayList()
                buckets.add(histogramCounts[0].bucket(baseTimeUnit))
                counts.add(histogramCounts[0].count())
                for (i in 1 until histogramCounts.size) {
                    val countAtBucket = histogramCounts[i]
                    buckets.add(countAtBucket.bucket(baseTimeUnit))
                    counts.add(countAtBucket.count() - histogramCounts[i - 1].count())
                }
                if (java.lang.Double.isFinite(histogramCounts[histogramCounts.size - 1].bucket())) {
                    // ClassicHistogramBuckets is not cumulative
                    buckets.add(Double.POSITIVE_INFINITY)
                    counts.add(count - histogramCounts[histogramCounts.size - 1].count())
                }

                val exemplars = createExemplarsWithScaledValues(exemplarsSupplier.get())
                val classicHistogramBuckets = try {
                    ClassicHistogramBuckets.of(buckets, counts)
                } catch (e: Exception) {
                    val message = "Failed creating ClassicHistogramBuckets"
                    val exceptionMessage =
                        "$message; snapshot {\n$histogramSnapshot}"
                    throw ClassicHistogramBucketsException(
                        exceptionMessage,
                        e,
                        buckets,
                        counts,
                        tagKeys,
                        tagValues,
                        id
                    )
                }
                families.add(
                    Family(
                        conventionName,
                        { family: Family<HistogramDataPointSnapshot?> ->
                            HistogramSnapshot(
                                forLongTaskTimer,
                                family.metadata,
                                family.dataPointSnapshots,
                            )
                        },
                        getMetadata(id),
                        HistogramDataPointSnapshot(
                            classicHistogramBuckets,
                            sum,
                            Labels.of(tagKeys, tagValues),
                            exemplars,
                            0,
                        ),
                    ),
                )
            }

            families.add(
                Family(
                    conventionName + "_max",
                    { family: Family<GaugeDataPointSnapshot?> ->
                        GaugeSnapshot(
                            family.metadata,
                            family.dataPointSnapshots,
                        )
                    },
                    getMetadata(id, "_max"),
                    GaugeDataPointSnapshot(
                        histogramSnapshot.max(baseTimeUnit),
                        Labels.of(tagKeys, tagValues),
                        null,
                    ),
                ),
            )
            families.build()
        }
    }

    private fun createExemplarsWithScaledValues(exemplars: Exemplars) = Exemplars.of(
        StreamSupport
            .stream(exemplars.spliterator(), false)
            .map { exemplar: Exemplar ->
                createExemplarWithNewValue(
                    TimeUtils.convert(
                        exemplar.value,
                        TimeUnit.NANOSECONDS,
                        baseTimeUnit,
                    ),
                    exemplar,
                )
            }.collect(Collectors.toList()),
    )

    private fun createExemplarWithNewValue(newValue: Double, exemplar: Exemplar): Exemplar = Exemplar
        .builder()
        .value(newValue)
        .labels(exemplar.labels)
        .timestampMillis(exemplar.timestampMillis)
        .build()

    private fun getMetadata(id: Meter.Id): MetricMetadata = getMetadata(id, "")

    private fun getMetadata(id: Meter.Id, suffix: String): MetricMetadata {
        val name = config().namingConvention().name(id.name, id.type, id.baseUnit) + suffix
        val help = if (prometheusConfig.descriptions()) Optional.ofNullable(id.description).orElse(" ") else " "
        return MetricMetadata(name, help, null)
    }
}

Also, here is the main method to reproduce the bug.

package io.micrometer.prometheusmetrics

import io.micrometer.core.instrument.Meter
import io.micrometer.core.instrument.Tag
import io.micrometer.core.instrument.Tags
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig
import io.micrometer.core.instrument.distribution.pause.ClockDriftPauseDetector
import io.prometheus.metrics.model.registry.PrometheusRegistry
import java.time.Duration

fun main() {
    val loggingPrometheusMeterRegistry = LoggingPrometheusMeterRegistry(PrometheusConfig.DEFAULT, PrometheusRegistry())
    val id = Meter.Id(
        "storage_file_download_time",
        Tags.of(Tag.of("file_source", "S3"), Tag.of("file_type", "FILTER")),
        null,
        null,
        Meter.Type.TIMER
    )
    val newTimer = loggingPrometheusMeterRegistry.newTimer(
        id,
        DistributionStatisticConfig.DEFAULT,
        ClockDriftPauseDetector(Duration.ofMinutes(1), Duration.ofMinutes(5))
    )
    val toList = loggingPrometheusMeterRegistry.collectorMap.map { (k, v) -> v.collect() }.toList()
}

Expected behavior It seems like Histogram should be created in normal way in this case.

Additional context It is likely that the changes taking place in the PrometheusMeterRegistry.addDistributionStatisticSamples during the transition from version 1.12.6 to version 1.13.0 could affect the operation of this scenario

jonatan-ivanov commented 2 weeks ago

Thank you for the issue! Do you think you can provide us a minimal Java sample to reproduce this issue so we can more easily investigate and ensure any fix is working properly for your use case? (Right now the reproducer does not seems simple nor Java.)

I think this is a duplicate of https://github.com/micrometer-metrics/micrometer/issues/5193 so let me close this and continue the discussion there, please let us know if you disagree and we can reopen.

jonatan-ivanov commented 2 weeks ago

Duplicate of https://github.com/micrometer-metrics/micrometer/issues/5193