influxdata / influxdb-relay

Service to replicate InfluxDB data for high availability
MIT License
853 stars 350 forks source link

Support relay of collectd UDP #83

Open bhakta0007 opened 3 years ago

bhakta0007 commented 3 years ago

I looked at https://github.com/influxdata/influxdb-relay/issues/54 and https://github.com/influxdata/influxdb-relay/issues/29 this seems to be couple of years old.

Is there no way to use HA-based deployment for collectd based ingest traffic?

What is the problem that one type of UDP payload (line protocol) can be relayed but not the other (Collectd). Is there any technical problem that prevents relay of collectd based ingest data?

bhakta0007 commented 3 years ago

Just to prove the concept of forwarding the UDP payload works, I made this change and tried it out..

(venv) bhakta@blr:~/dev/influxdb-relay$ git diff .
diff --git a/relay/udp.go b/relay/udp.go
index 081b94d..9a2552e 100644
--- a/relay/udp.go
+++ b/relay/udp.go
@@ -8,8 +8,6 @@ import (
        "sync"
        "sync/atomic"
        "time"
-
-       "github.com/influxdata/influxdb1-client/models"
 )

 const (
@@ -160,38 +158,13 @@ func (u *UDP) Stop() error {
 }

 func (u *UDP) post(p *packet) {
-       points, err := models.ParsePointsWithPrecision(p.data.Bytes(), p.timestamp, u.precision)
-       if err != nil {
-               log.Printf("Error parsing packet in relay %q from %v: %v", u.Name(), p.from, err)
-               putUDPBuf(p.data)
-               return
-       }
-
-       out := getUDPBuf()
-       for _, pt := range points {
-               if _, err = out.WriteString(pt.PrecisionString(u.precision)); err != nil {
-                       break
-               }
-               if err = out.WriteByte('\n'); err != nil {
-                       break
-               }
-       }
-
-       putUDPBuf(p.data)
-
-       if err != nil {
-               putUDPBuf(out)
-               log.Printf("Error writing points in relay %q: %v", u.Name(), err)
-               return
-       }
-
+       // log.Printf("Got packet relay %q from %v: length %v", u.Name(), p.from, len(p.data.Bytes()))
        for _, b := range u.backends {
-               if err := b.post(out.Bytes()); err != nil {
-                       log.Printf("Error writing points in relay %q to backend %q: %v", u.Name(), b.name, err)
+               // log.Printf("Forward data from %v to %v, lengh %v", p.from, b.addr, len(p.data.Bytes()))
+               if err := b.post(p.data.Bytes()); err != nil {
+                       log.Printf("Error forwarding data from %v to %v, lengh %v: %v", p.from, b.addr, len(p.data.Bytes()), err)
                }
        }
-
-       putUDPBuf(out)
 }

What this is doing Is taking the incoming UDP buffer (without trying to parse for the line protocol) and simply writing it out to the list of backends. I am new to golang & influxdb-relay code - I am not completely sure I have written the code in a way there are no leaks, etc).

If you guys think this should work - I can work on a patch that introduces a "collectd" relay (in addition to existing HTTP and udp relays).

Do you guys foresee any issues?