pyr / riemann-kafka

kafka producer and consumer support in riemann
Other
41 stars 14 forks source link

Kafka consumer support customized message parser #5

Closed liuweichu closed 8 years ago

liuweichu commented 9 years ago

When riemann-kafka getting messages from kafka, the format could be not only protobuf binary, but also strings (For example when we use collectd to send metrics in json/graphite format with write_kafka).

It will be great to have a option to pass a :parser-fn (the same as riemann.transport.graphite do) to handle the payload in addition to protobuf decoding. The :parser-fn should return a sequence of events, so start-kafka-thread can iterate it and inject to riemann core.

This can be done by decoupling safe-decode function. If the providers send strings, it will be converted into Java Byte Array. So (String. value) will provide :parser-fn the string that's input from the other side.

panda87 commented 9 years ago

@liuweichu - Did you find a way to decode the Kafka byte array to Riemann event?

liuweichu commented 9 years ago

@panda87 Here's my solution, simply override some function

(ns riemann.plugin.kafka
  (:require [clojure.data.json :as json]))

(defn msgs-to-events
  "Collectd JSONs convert to Riemann Events, [{msg1} {msg2}] -> [{msg1:event1} {msg1:event2} {msg2:event1} ...]"
  [msgs]
  ; YOUR OWN EVENTS PROCESSING FUNCTION
  )

(defn safe-decode
  "Overwrite riemann-kafka. Decode messages from Kafka in JSON format instead of protobuf"
  [input]
  (try
    (let [{:keys [value]} (to-clojure input)
          decode-msg (json/read-str (String. value) :key-fn keyword)
          events (msgs-to-events decode-msg)]
      {:events events})
    (catch Exception e
      (error e "Could not decode json msgs"))))
panda87 commented 9 years ago

Thanks @liuweichu , i'll try that!

higebu commented 9 years ago

+1