riemann / riemann

A network event stream processing system, in Clojure.
https://riemann.io
Eclipse Public License 1.0
4.23k stars 515 forks source link

Events' reinjection and expiration not working correctly #538

Closed blkt closed 9 years ago

blkt commented 9 years ago

I'm having an issue with events reinjected by a first stream not being handled as expected by a second stream.

More specifically, I have a few machines sending events to Riemann; each of these machine sends an event with service "incoming service" that I monitor to check whether the particular host is alive or not (this event is sent every, say, 5 seconds, ttl is 10 seconds).

Informally, the first streams definition should:

  1. get the event
  2. log the received event
  3. rewrite the event's :service to "heartbeat" and :state to "alive"
  4. log the modified event's reinjection
  5. reinject it

The second streams definition should:

  1. wait for events with service "heartbeat"
  2. fork the incoming events by host
  3. check for changes on the events' :state field
  4. on changes, log the event
  5. send the event to slack (mocked with an invocation to info)

My problem is that the first host to pass through the streams prevents the others from being processed until its expiration. Is there something I'm clearly making wrong?

Riemann config file

;; -*- mode: clojure; -*-
;; vi: filetype=clojure :

(logging/init {:file "/var/log/riemann/riemann.log"})

(tcp-server {:host "0.0.0.0" :port 5555})
(udp-server {:host "0.0.0.0" :port 5555})
(ws-server {:host "0.0.0.0" :port 5556})
(repl-server {:host "127.0.0.1" :port 5557})

(periodically-expire 5 {:keep-keys [:host :service :tags :application :customer]})

(def chat-expired #(info "SLACKED" (pr-str %)))

;; Streams definitions
(let [index (default {:ttl 10} (tap :index (index)))]
  ;; index everything first
  (streams
   index)

  ;; heartbeat reinjection stream
  (streams
   (where
    (and (not (expired? event))
         (tagged "application:riskapi")
         (service #{"incoming service"}))
    #(warn "**********************" (pr-str %))
    (with {:service "heartbeat" :state "alive"}
          (io #(info "reinjection" (pr-str %)))
          (tap :heartbeat-reinjection reinject))))

  ;; heartbeat stream
  (streams
   (where
    (and (tagged "application:riskapi")
         (service "heartbeat"))
    (pipe e
          (by [:host] e)
          (changed :state e)
          (sdo (io #(info "heartbeat" (pr-str %)))
               chat-expired)))))

Simple (Ruby) client to test the configuration

#!/usr/bin/ruby
require 'riemann/client'

if ARGV[0]
  host = ARGV[0]
else
  host = "127.0.0.1"
end

puts "Sending to #{host}"
r = Riemann::Client.new host: host

evts = [{host: "one", service: "incoming service", tags: ["application:riskapi"]},
        {host: "two", service: "incoming service", tags: ["application:riskapi"]}]

evts.each do |e|
  r << e
end

puts "Events sent"

Log I get on my machine

WARN [2015-03-25 12:32:56,454] defaultEventExecutorGroup-2-1 - riemann.config - ********************** #riemann.codec.Event{:host "one", :service "incoming service", :state nil, :description nil, :metric nil, :tags ["application:riskapi"], :time 1427286776, :ttl nil, :value "true"}
INFO [2015-03-25 12:32:56,455] defaultEventExecutorGroup-2-1 - riemann.config - reinjection #riemann.codec.Event{:host "one", :service "heartbeat", :state "alive", :description nil, :metric nil, :tags ["application:riskapi"], :time 1427286776, :ttl nil, :value "true"}
INFO [2015-03-25 12:32:56,457] defaultEventExecutorGroup-2-1 - riemann.config - heartbeat #riemann.codec.Event{:host "one", :service "heartbeat", :state "alive", :description nil, :metric nil, :tags ["application:riskapi"], :time 1427286776, :ttl nil, :value "true"}
INFO [2015-03-25 12:32:56,460] defaultEventExecutorGroup-2-1 - riemann.config - SLACKED #riemann.codec.Event{:host "one", :service "heartbeat", :state "alive", :description nil, :metric nil, :tags ["application:riskapi"], :time 1427286776, :ttl nil, :value "true"}
WARN [2015-03-25 12:32:56,461] defaultEventExecutorGroup-2-1 - riemann.config - ********************** #riemann.codec.Event{:host "two", :service "incoming service", :state nil, :description nil, :metric nil, :tags ["application:riskapi"], :time 1427286776, :ttl nil, :value "true"}
INFO [2015-03-25 12:32:56,462] defaultEventExecutorGroup-2-1 - riemann.config - reinjection #riemann.codec.Event{:host "two", :service "heartbeat", :state "alive", :description nil, :metric nil, :tags ["application:riskapi"], :time 1427286776, :ttl nil, :value "true"}

INFO [2015-03-25 12:33:10,111] Thread-7 - riemann.config - heartbeat {:time 142728679011/100, :state "expired", :tags ["application:riskapi"], :service "heartbeat", :host "two"}
INFO [2015-03-25 12:33:10,112] Thread-7 - riemann.config - SLACKED {:time 142728679011/100, :state "expired", :tags ["application:riskapi"], :service "heartbeat", :host "two"}
aphyr commented 9 years ago

(by) works lexically--it can only capture the expressions inside (by ...). In this case, pipe has bound a single (changed :state) stream to the symbol e, and by is re-evaluating its body--e, for each separate host--and because e always points to the same changed stream, it's routing every event there. You probably didn't mean to use pipe here--I suspect you just wanted to use the regular (by :host (changed :state ...)) style.

blkt commented 9 years ago

Well, the real configuration I have is more complex than this, this was only to reproduce the issue, thus the need to use pipe, but I get it now.

Thank you.

aphyr commented 9 years ago

So the reason pipe exists is specifically to cause this stream-unifying behavior--if you need pipe, I'm guessing you wanted this to happen. :)

blkt commented 9 years ago

I thought it could be used a la -> or ->> :) I'm closing this issue.