segmentio / stats

Go package for abstracting stats collection
https://godoc.org/github.com/segmentio/stats
MIT License
208 stars 32 forks source link

Prometheus stats are incompatible with the prometheus sdk #131

Open ghost opened 3 years ago

ghost commented 3 years ago

Hi!

We are encountering a problem with integrating segmentio stats for our kafka consumers into the rest of our prometheus metrics. The issue arises due to the stats lib from segmentio not using the sdk that prometheus provides and instead rolls it's own collector and publisher. This means that the two libraries are fundamentally incompatible when it comes to serving them under the same '/metrics' path.

Is there a way that someone has worked around this? If not, could some form of adapter be added to the lib?

I don't want to be the guy that asks for a large scale rewrite, but it would be nice to see this awesome lib use the sdk provided by prometheus for golang.

abraithwaite commented 3 years ago

Hey @jdeal-mediamath, I think it's unlikely for us to use the prometheus library in this module for various reasons. However, we don't register the metrics URL automatically so you should be able to host the metrics exporter at a separate path from the Prometheus package. It would mean that you would have to scrape 2 endpoints from the same service, but I don't really see a way around that.

What are you looking to integrate with? If it's an internal package, why not use segmentio/stats? If it's a third party library, perhaps you could suggest they add an interface for exporting stats versus using prometheus directly?

mmorlonDeezer commented 2 years ago

here is the wip i'm currently working on in my consumer app project :

usage :

var defaultKafkaCollector = newKafkaCollector(namespace)

func ObserveKafkaReader(reader *kafka.Reader) {
    defaultKafkaCollector.InstrumentReader(reader)
}

"glue code"

package metrics

import (
    "sync"

    "github.com/prometheus/client_golang/prometheus"
    "github.com/segmentio/kafka-go"
)

var labelNames = []string{"topic", "partition"}

type counterAdder struct {
    counter *prometheus.CounterVec
    getter  func(*kafka.ReaderStats) float64
}

func (ca *counterAdder) Add(r *kafka.ReaderStats) {
    ca.counter.WithLabelValues(r.Topic, r.Partition).Add(ca.getter(r))
}

type gaugeSetter struct {
    gauge  *prometheus.GaugeVec
    getter func(*kafka.ReaderStats) float64
}

func (gs *gaugeSetter) Set(r *kafka.ReaderStats) {
    gs.gauge.WithLabelValues(r.Topic, r.Partition).Set(gs.getter(r))
}

type kafkaCollector struct {
    readers     []*kafka.Reader
    readersLock sync.RWMutex

    counters []*counterAdder
    gauges   []*gaugeSetter
}

func newKafkaCollector(namespace string) *kafkaCollector {
    collector := new(kafkaCollector)

    cFac := func(name string, f func(*kafka.ReaderStats) float64) {
        collector.counters = append(collector.counters, &counterAdder{
            counter: prometheus.NewCounterVec(prometheus.CounterOpts{Namespace: namespace, Subsystem: "kafka", Name: name}, labelNames),
            getter:  f,
        })
    }

    gFac := func(name string, f func(*kafka.ReaderStats) float64) {
        collector.gauges = append(collector.gauges, &gaugeSetter{
            gauge:  prometheus.NewGaugeVec(prometheus.GaugeOpts{Namespace: namespace, Subsystem: "kafka", Name: name}, labelNames),
            getter: f,
        })
    }

    cFac("dials_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Dials) })
    cFac("fetches_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Fetches) })
    cFac("messages_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Messages) })
    cFac("bytes_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Bytes) })
    cFac("rebalances_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Rebalances) })
    cFac("timeouts_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Timeouts) })
    cFac("errors_total", func(rs *kafka.ReaderStats) float64 { return float64(rs.Errors) })

    gFac("offset", func(rs *kafka.ReaderStats) float64 { return float64(rs.Offset) })
    gFac("lag", func(rs *kafka.ReaderStats) float64 { return float64(rs.Lag) })
    gFac("queue_length", func(rs *kafka.ReaderStats) float64 { return float64(rs.QueueLength) })
    gFac("queue_capacity", func(rs *kafka.ReaderStats) float64 { return float64(rs.QueueCapacity) })

    prometheus.MustRegister(collector)

    return collector
}

func (c *kafkaCollector) Collect(m chan<- prometheus.Metric) {

    c.readersLock.RLock()
    defer c.readersLock.RUnlock()

    for _, reader := range c.readers {
        stats := reader.Stats()
        for _, counter := range c.counters {
            counter.Add(&stats)
        }
        for _, gauge := range c.gauges {
            gauge.Set(&stats)
        }
    }

    for _, counter := range c.counters {
        counter.counter.Collect(m)
    }
    for _, gauge := range c.gauges {
        gauge.gauge.Collect(m)
    }
}

func (c *kafkaCollector) Describe(d chan<- *prometheus.Desc) {
    for _, counter := range c.counters {
        counter.counter.Describe(d)
    }
    for _, gauge := range c.gauges {
        gauge.gauge.Describe(d)
    }
}

func (c *kafkaCollector) InstrumentReader(r *kafka.Reader) {
    c.readersLock.Lock()
    defer c.readersLock.Unlock()
    c.readers = append(c.readers, r)
}