github / brubeck

A Statsd-compatible metrics aggregator
MIT License
1.19k stars 94 forks source link

Lone metrics stuck in rcvmmsg call #38

Closed deanbittner closed 8 years ago

deanbittner commented 8 years ago

In the brubeck source, at /brubeck-master/src/samplers/statsd.c, line 37, the rcvmmsg call should have the MSG_WAITFORONE flag set in the 4th argument, which is currently zero. With this flag NOT set, a single udp msg does not receive, and without a timeout, the call blocks indefinitely, waiting for > 1 message. This can result in the loss of lone messages, like sparse calls with a single metric.

Most of the time, the metrics flow is significant, and these single messages are flushed with additional messages, and the rcvmmsg will work. If, however, a single metric is sent, the call blocks indefinitely, leaving the message stranded.

This can be reproduced by sending a single metric to a running instance of brubeck. The single metric fails to emit in the line or pickle protocol to graphite. When the MSG_WAITFORONE flag is added, the metric flows through as expected.

The workaround is to change the brubeck configuration by setting the value of the "multimsg" setting in /etc/brubeck/config.json to 1, and then restart brubeck. This configuration change deactivates the rcvmmsg call detailed above, falling back to rcvmsg. It's not clear that this will have any performance implication with multiple worker threads in the receive pool; however, it seems likely that there will be some impact on a busy metrics flow.

I have built and tested with the flag above, and it works fine. (I also have a Mac build for anyone who wants it).

vmg commented 8 years ago

Thanks for the report! I've cherry picked and pushed your proposed change. :)

deanbittner commented 8 years ago

I’m getting some of these now in the brubeck log … after #38 …

instance=orwell.sls.inf.brubeck.log01 sampler=statsd event=packet_drop instance=orwell.sls.inf.brubeck.log01 sampler=statsd event=packet_drop instance=orwell.sls.inf.brubeck.log01 sampler=statsd event=packet_drop

On May 4, 2016, at 3:06 AM, Vicent Marti notifications@github.com wrote:

Closed #38 https://github.com/github/brubeck/issues/38.

— You are receiving this because you authored the thread. Reply to this email directly or view it on GitHub https://github.com/github/brubeck/issues/38#event-649911003

deanbittner commented 8 years ago

The recvmmsg call returns the number of messages read, which will often be less than SIM_PACKETS. It looks like if there are less messages received than SIM_PACKETS, a dropped_packet is logged. I think the change is to test the number of packets received against the positive return result from the recvmmsg call instead of SIM_PACKETS. I will test that.

On May 4, 2016, at 3:06 AM, Vicent Marti notifications@github.com wrote:

Closed #38 https://github.com/github/brubeck/issues/38.

— You are receiving this because you authored the thread. Reply to this email directly or view it on GitHub https://github.com/github/brubeck/issues/38#event-649911003

deanbittner commented 8 years ago
diff --git a/src/samplers/statsd.c b/src/samplers/statsd.c
index 7f828f0..f1f49de 100644
--- a/src/samplers/statsd.c
+++ b/src/samplers/statsd.c
@@ -4,6 +4,7 @@
 #include <sys/socket.h>
 #include "brubeck.h"

+
 #ifdef __GLIBC__
 #      if ((__GLIBC__ > 2) || ((__GLIBC__ == 2) && (__GLIBC_MINOR__ >= 12)))
 #              define HAVE_RECVMMSG 1
@@ -34,7 +35,10 @@ static void statsd_run_recvmmsg(struct brubeck_statsd *statsd, int sock)
        log_splunk("sampler=statsd event=worker_online syscall=recvmmsg socket=%d", sock);

        for (;;) {
-               int res = recvmmsg(sock, msgs, SIM_PACKETS, 0, NULL);
+
+    //MSG_WAITFORONE
+    //         int res = recvmmsg(sock, msgs, SIM_PACKETS, 0, NULL);
+               int res = recvmmsg(sock, msgs, SIM_PACKETS, MSG_WAITFORONE, NULL);

                if (res < 0) {
                        if (errno == EAGAIN || errno == EINTR)
@@ -44,11 +48,18 @@ static void statsd_run_recvmmsg(struct brubeck_statsd *statsd, int sock)
                        brubeck_stats_inc(server, errors);
                        continue;
                }
+               if (res == 0)
+                 {
+                       log_splunk_errno("sampler=statsd event=no_read");
+                       brubeck_stats_inc(server, errors);
+                       continue;
+                 }

                /* store stats */
-               brubeck_atomic_add(&statsd->sampler.inflow, SIM_PACKETS);
+               //              brubeck_atomic_add(&statsd->sampler.inflow, SIM_PACKETS);
+               brubeck_atomic_add(&statsd->sampler.inflow, res);

-               for (i = 0; i < SIM_PACKETS; ++i) {
+               for (i = 0; i < res; ++i) {
                        char *buf = msgs[i].msg_hdr.msg_iov->iov_base;
                        char *end = buf + msgs[i].msg_len;
                        brubeck_statsd_packet_parse(server, buf, end);