logstash-plugins / logstash-codec-collectd

Apache License 2.0
6 stars 11 forks source link

plugin mangles collectd metrics under some conditions #15

Closed frapex closed 9 years ago

frapex commented 9 years ago

I found out that the plugin mangles collectd metrics under some conditions. The bug is triggered when (for example) the percent metrics are send in rapid succession, one after another.

These metrics:

Become like this:

The type is lost. A new PLUGIN_TYPE should not cause the 'collectd_type' field to be removed from the event.

References to somewhat similar issues:

Testing was done using collectd-5.5.0-2.el7.x86_64 and logstash-1.5.2-1.noarch. The bug can be reproduced easily using the configuration files below:

# /tmp/collectd_test.conf
LoadPlugin cpu
LoadPlugin swap
LoadPlugin memory
LoadPlugin network
LoadPlugin match_regex
LoadPlugin target_replace

<Plugin cpu>
  ReportByCpu false
  ReportByState false
  ValuesPercentage true
</Plugin>

<Plugin swap>
  ReportBytes true
  ValuesAbsolute false
  ValuesPercentage true
</Plugin>

<Plugin memory>
  ValuesPercentage true
</Plugin>

<Plugin network>
    <Server "localhost">
    </Server>
</Plugin>
# /tmp/logstash_test.conf
input {
  udp {
    port => 25826
    buffer_size => 1452
  }
}

output {
  if ![collectd_type] {
    stdout { codec => "rubydebug" }
  }
}
# cmds
/usr/sbin/collectd -C /tmp/collectd_test.conf -f
sudo -u logstash /opt/logstash/bin/logstash -f /tmp/logstash_test.conf
frapex commented 9 years ago

I crafted two patches:

The patches include some other improvements as well.

PATCH 1 (just make it work):

--- a/collectd.rb  2015-08-05 19:22:03.486795136 +0200
+++ b/collectd.rb    2015-08-06 08:47:46.864839543 +0200
@@ -49,6 +49,7 @@

   PLUGIN_TYPE = 2
   COLLECTD_TYPE = 4
+  COLLECTD_VALUES = 6
   SIGNATURE_TYPE = 512
   ENCRYPTION_TYPE = 528

@@ -59,7 +60,7 @@
     3               => "plugin_instance",
     COLLECTD_TYPE   => "collectd_type",
     5               => "type_instance",
-    6               => "values",
+    COLLECTD_VALUES => "values",
     7               => "interval",
     8               => "@timestamp",
     9               => "interval",
@@ -72,6 +73,7 @@
   PLUGIN_TYPE_FIELDS = {
     'host' => true,
     '@timestamp' => true,
+    'collectd_type' => true,
     'type_instance' => true,
     'severity' => true,
   }
@@ -93,10 +95,10 @@

   INTERVAL_BASE_FIELDS = {
     'host' => true,
-    'collectd_type' => true,
+    '@timestamp' => true,
     'plugin' => true,
     'plugin_instance' => true,
-    '@timestamp' => true,
+    'collectd_type' => true,
     'type_instance' => true,
   }

@@ -207,7 +209,7 @@
       byte1, byte2 = body.pack("C*").unpack("NN")
       Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).utc
     end
-    # Hi resolution intervals
+    # Hi-Resolution intervals
     hiresinterval_decoder = lambda do |body|
       byte1, byte2 = body.pack("C*").unpack("NN")
       Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).to_i
@@ -297,7 +299,7 @@
   def get_values(id, body)
     drop = false
     add_tag = false
-    if id == 6
+    if id == COLLECTD_VALUES
       retval, drop, add_nan_tag = @id_decoder[id].call(body)
     # Use hash + closure/lambda to speed operations
     else
@@ -421,15 +423,14 @@
         was_encrypted = true
         next
       when PLUGIN_TYPE
-        # We've reached a new plugin, delete everything except for the the host
-        # field, because there's only one per packet and the timestamp field,
-        # because that one goes in front of the plugin
+        # Reached a new plugin, delete all fields that might
+        # be related to the previous plugin (if any).
         collectd.each_key do |k|
           collectd.delete(k) unless PLUGIN_TYPE_FIELDS.has_key?(k)
         end
       when COLLECTD_TYPE
-        # We've reached a new type within the plugin section, delete all fields
-        # that could have something to do with the previous type (if any)
+        # Reached a new type, delete all fields that might
+        # be related to the previous type (if any).
         collectd.each_key do |k|
           collectd.delete(k) unless COLLECTD_TYPE_FIELDS.has_key?(k)
         end

PATCH 2 (removed the field cleanup code entirely):

--- a/collectd.rb  2015-08-05 19:22:03.486795136 +0200
+++ b/collectd.rb    2015-08-06 09:03:00.393084304 +0200
@@ -49,6 +49,7 @@

   PLUGIN_TYPE = 2
   COLLECTD_TYPE = 4
+  COLLECTD_VALUES = 6
   SIGNATURE_TYPE = 512
   ENCRYPTION_TYPE = 528

@@ -59,7 +60,7 @@
     3               => "plugin_instance",
     COLLECTD_TYPE   => "collectd_type",
     5               => "type_instance",
-    6               => "values",
+    COLLECTD_VALUES => "values",
     7               => "interval",
     8               => "@timestamp",
     9               => "interval",
@@ -69,22 +70,6 @@
     ENCRYPTION_TYPE => "encryption"
   }

-  PLUGIN_TYPE_FIELDS = {
-    'host' => true,
-    '@timestamp' => true,
-    'type_instance' => true,
-    'severity' => true,
-  }
-
-  COLLECTD_TYPE_FIELDS = {
-    'host' => true,
-    '@timestamp' => true,
-    'plugin' => true,
-    'plugin_instance' => true,
-    'type_instance' => true,
-    'severity' => true,
-  }
-
   INTERVAL_VALUES_FIELDS = {
     "interval" => true,
     "values" => true,
@@ -93,10 +78,10 @@

   INTERVAL_BASE_FIELDS = {
     'host' => true,
-    'collectd_type' => true,
+    '@timestamp' => true,
     'plugin' => true,
     'plugin_instance' => true,
-    '@timestamp' => true,
+    'collectd_type' => true,
     'type_instance' => true,
   }

@@ -207,7 +192,7 @@
       byte1, byte2 = body.pack("C*").unpack("NN")
       Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).utc
     end
-    # Hi resolution intervals
+    # Hi-Resolution intervals
     hiresinterval_decoder = lambda do |body|
       byte1, byte2 = body.pack("C*").unpack("NN")
       Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).to_i
@@ -297,7 +282,7 @@
   def get_values(id, body)
     drop = false
     add_tag = false
-    if id == 6
+    if id == COLLECTD_VALUES
       retval, drop, add_nan_tag = @id_decoder[id].call(body)
     # Use hash + closure/lambda to speed operations
     else
@@ -420,19 +405,6 @@
         raise(EncryptionError) if payload.empty?
         was_encrypted = true
         next
-      when PLUGIN_TYPE
-        # We've reached a new plugin, delete everything except for the the host
-        # field, because there's only one per packet and the timestamp field,
-        # because that one goes in front of the plugin
-        collectd.each_key do |k|
-          collectd.delete(k) unless PLUGIN_TYPE_FIELDS.has_key?(k)
-        end
-      when COLLECTD_TYPE
-        # We've reached a new type within the plugin section, delete all fields
-        # that could have something to do with the previous type (if any)
-        collectd.each_key do |k|
-          collectd.delete(k) unless COLLECTD_TYPE_FIELDS.has_key?(k)
-        end
       end

       raise(EncryptionError) if !was_encrypted and @security_level == SECURITY_ENCR
untergeek commented 9 years ago

Fixed by #16