elastic / opentelemetry-lib

Apache License 2.0
0 stars 6 forks source link

Add support for disk scraper #17

Closed ishleenk17 closed 3 weeks ago

ishleenk17 commented 1 month ago

Relates: https://github.com/elastic/opentelemetry-dev/issues/181

ishleenk17 commented 1 month ago

Will refactor the code slightly.

ishleenk17 commented 3 weeks ago

Will refactor the code slightly.

Done!

lahsivjar commented 3 weeks ago

Suggestion for simplifying the code:

diff --git a/remappers/hostmetrics/disk.go b/remappers/hostmetrics/disk.go
index fe59843..6e9abf8 100644
--- a/remappers/hostmetrics/disk.go
+++ b/remappers/hostmetrics/disk.go
@@ -18,12 +18,12 @@
 package hostmetrics

 import (
+   "errors"
    "fmt"

    remappers "github.com/elastic/opentelemetry-lib/remappers/internal"
    "go.opentelemetry.io/collector/pdata/pcommon"
    "go.opentelemetry.io/collector/pdata/pmetric"
-   "golang.org/x/exp/constraints"
 )

 var metricsToAdd = map[string]string{
@@ -36,85 +36,53 @@ var metricsToAdd = map[string]string{

 // remapDiskMetrics remaps disk-related metrics from the source to the output metric slice.
 func remapDiskMetrics(src, out pmetric.MetricSlice, _ pcommon.Resource, dataset string) error {
+   var errs []error
    for i := 0; i < src.Len(); i++ {
+       var err error
        metric := src.At(i)
        switch metric.Name() {
        case "system.disk.io", "system.disk.operations", "system.disk.pending_operations":
-           remapDiskIntMetrics(metric, out, dataset, 1)
+           err = addDiskMetric(metric, out, dataset, 1)
        case "system.disk.operation_time", "system.disk.io_time":
-           remapDiskFloatMetrics(metric, out, dataset, 1000)
+           err = addDiskMetric(metric, out, dataset, 1000)
+       }
+       if err != nil {
+           errs = append(errs, err)
        }
    }
-   return nil
+   return errors.Join(errs...)
 }

-// remapDiskIntMetrics processes integer-based disk metrics.
-func remapDiskIntMetrics(metric pmetric.Metric, out pmetric.MetricSlice, dataset string, multiplier int64) {
-   dataPoints := metric.Sum().DataPoints()
-   for j := 0; j < dataPoints.Len(); j++ {
-       dp := dataPoints.At(j)
-       if device, ok := dp.Attributes().Get("device"); ok {
-           timestamp := dp.Timestamp()
-           value := dp.IntValue()
-           direction, _ := dp.Attributes().Get("direction")
-           addDiskMetric(out, dataset, metric.Name(), device.Str(), direction.Str(), timestamp, value, multiplier)
-       } else {
-           fmt.Printf("Missing 'device' attribute for metric: %s\n", metric.Name())
-       }
+func addDiskMetric(metric pmetric.Metric, out pmetric.MetricSlice, dataset string, multiplier int64) error {
+   metricNetworkES, ok := metricsToAdd[metric.Name()]
+   if !ok {
+       // This condition should never satisfy, if it is true then the code is
+       // missing handling a metric name.
+       return fmt.Errorf("unexpected metric name: %s", metric.Name())
    }
-}

-// processFloatMetrics processes float-based disk metrics.
-func remapDiskFloatMetrics(metric pmetric.Metric, out pmetric.MetricSlice, dataset string, multiplier float64) {
-   dataPoints := metric.Sum().DataPoints()
-   for j := 0; j < dataPoints.Len(); j++ {
-       dp := dataPoints.At(j)
+   dps := metric.Sum().DataPoints()
+   for i := 0; i < dps.Len(); i++ {
+       dp := dps.At(i)
        if device, ok := dp.Attributes().Get("device"); ok {
-           timestamp := dp.Timestamp()
-           value := dp.DoubleValue()
            direction, _ := dp.Attributes().Get("direction")
-           addDiskMetric(out, dataset, metric.Name(), device.Str(), direction.Str(), timestamp, value, multiplier)
-       } else {
-           fmt.Printf("Missing 'device' attribute for metric: %s\n", metric.Name())
+           remappedMetric := remappers.Metric{
+               DataType:  pmetric.MetricTypeSum,
+               Name:      fmt.Sprintf(metricNetworkES, direction.Str()),
+               Timestamp: dp.Timestamp(),
+           }
+           switch dp.ValueType() {
+           case pmetric.NumberDataPointValueTypeInt:
+               v := dp.IntValue() * multiplier
+               remappedMetric.IntValue = &v
+           case pmetric.NumberDataPointValueTypeDouble:
+               v := dp.DoubleValue() * float64(multiplier)
+               remappedMetric.DoubleValue = &v
+           }
+           remappers.AddMetrics(out, dataset, func(dp pmetric.NumberDataPoint) {
+               dp.Attributes().PutStr("system.diskio.name", device.Str())
+           }, remappedMetric)
        }
    }
-}
-
-// addDiskMetric adds a remapped disk metric to the output slice.
-func addDiskMetric[T constraints.Integer | constraints.Float](
-   out pmetric.MetricSlice,
-   dataset, name, device, direction string,
-   timestamp pcommon.Timestamp,
-   value T, multiplier T,
-) {
-
-   metricNetworkES, ok := metricsToAdd[name]
-   if !ok {
-       fmt.Printf("Unknown metric name: %s\n", name)
-       return
-   }
-
-   scaledValue := value * multiplier
-   var intValue *int64
-   var doubleValue *float64
-   switch v := any(scaledValue).(type) {
-   case int64:
-       intValue = &v
-   case float64:
-       doubleValue = &v
-   default:
-       fmt.Printf("Unsupported value type for metric: %s\n", name)
-       return
-   }
-
-   remappers.AddMetrics(out, dataset, func(dp pmetric.NumberDataPoint) {
-       dp.Attributes().PutStr("system.diskio.name", device)
-   },
-       remappers.Metric{
-           DataType:    pmetric.MetricTypeSum,
-           Name:        fmt.Sprintf(metricNetworkES, direction),
-           Timestamp:   timestamp,
-           IntValue:    intValue,
-           DoubleValue: doubleValue,
-       })
+   return nil
 }
shmsr commented 3 weeks ago

This can be even further simplified:

diff --git a/remappers/hostmetrics/disk.go b/remappers/hostmetrics/disk.go
index 5629a84..0f50ef5 100644
--- a/remappers/hostmetrics/disk.go
+++ b/remappers/hostmetrics/disk.go
@@ -26,39 +26,33 @@ import (
    "go.opentelemetry.io/collector/pdata/pmetric"
 )

-var metricsToAdd = map[string]string{
-   "system.disk.io":                 "system.diskio.%s.bytes",
-   "system.disk.operations":         "system.diskio.%s.count",
-   "system.disk.pending_operations": "system.diskio.io.%sops",
-   "system.disk.operation_time":     "system.diskio.%s.time",
-   "system.disk.io_time":            "system.diskio.io.%stime",
+var metricsToAdd = map[string]struct {
+   metricDiskESPattern string
+   multiplier          int64
+}{
+   "system.disk.io":                 {"system.diskio.%s.bytes", 1},
+   "system.disk.operations":         {"system.diskio.%s.count", 1},
+   "system.disk.pending_operations": {"system.diskio.io.%sops", 1},
+   "system.disk.operation_time":     {"system.diskio.%s.time", 1000},
+   "system.disk.io_time":            {"system.diskio.io.%stime", 1000},
 }

 // remapDiskMetrics remaps disk-related metrics from the source to the output metric slice.
 func remapDiskMetrics(src, out pmetric.MetricSlice, _ pcommon.Resource, dataset string) error {
    var errs []error
    for i := 0; i < src.Len(); i++ {
-       var err error
        metric := src.At(i)
-       switch metric.Name() {
-       case "system.disk.io", "system.disk.operations", "system.disk.pending_operations":
-           err = addDiskMetric(metric, out, dataset, 1)
-       case "system.disk.operation_time", "system.disk.io_time":
-           err = addDiskMetric(metric, out, dataset, 1000)
-       }
-       if err != nil {
-           errs = append(errs, err)
+       if remapFunc, ok := metricsToAdd[metric.Name()]; ok {
+           addDiskMetric(metric, out, dataset, remapFunc.metricDiskESPattern, remapFunc.multiplier)
+       } else {
+           errs = append(errs, fmt.Errorf("unexpected metric name: %s", metric.Name()))
        }
    }
+
    return errors.Join(errs...)
 }

-func addDiskMetric(metric pmetric.Metric, out pmetric.MetricSlice, dataset string, multiplier int64) error {
-   metricNetworkES, ok := metricsToAdd[metric.Name()]
-   if !ok {
-       return fmt.Errorf("unexpected metric name: %s", metric.Name())
-   }
-
+func addDiskMetric(metric pmetric.Metric, out pmetric.MetricSlice, dataset, metricDiskESPattern string, multiplier int64) {
    dps := metric.Sum().DataPoints()
    for i := 0; i < dps.Len(); i++ {
        dp := dps.At(i)
@@ -66,7 +60,7 @@ func addDiskMetric(metric pmetric.Metric, out pmetric.MetricSlice, dataset strin
            direction, _ := dp.Attributes().Get("direction")
            remappedMetric := remappers.Metric{
                DataType:  pmetric.MetricTypeSum,
-               Name:      fmt.Sprintf(metricNetworkES, direction.Str()),
+               Name:      fmt.Sprintf(metricDiskESPattern, direction.Str()),
                Timestamp: dp.Timestamp(),
            }
            switch dp.ValueType() {
@@ -82,5 +76,4 @@ func addDiskMetric(metric pmetric.Metric, out pmetric.MetricSlice, dataset strin
            }, remappedMetric)
        }
    }
-   return nil
 }