pyr / riemann-kafka

kafka producer and consumer support in riemann
Other
41 stars 14 forks source link

Can't decode the messages from Kafka #11

Closed NxwJfm closed 8 years ago

NxwJfm commented 8 years ago

Hi.

I'm not sure this is an issue with the readme.md or my brain (more than likely the later) but I'm trying to setup a simple Kafka consumer so I can hook into the Kafka logstash topic (json) and use it to generate alerts and metrics. I have everything set up and can confirm that the Kafka consumer connects and starts consuming data, so as far as I can tell I've installed Riemann and the Kafka plugin correctly. Both are the latest version (0.2.10 and 0.1.4). If I use no decoder in the config, I get these errors: ERROR [2016-02-05 13:50:29,976] clojure-agent-send-off-pool-0 - riemann.plugin.kafka - could not decode msg com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag. at com.google.protobuf.InvalidProtocolBufferException.invalidEndTag(InvalidProtocolBufferException.java:94) Which I assume is normal.

But if I add the my-json-decoder example, I get this: ERROR [2016-02-05 13:55:47,176] clojure-agent-send-off-pool-4 - riemann.plugin.kafka - interrupted consumption java.lang.IllegalArgumentException: Key must be integer at clojure.lang.APersistentVector.assoc(APersistentVector.java:335) at clojure.lang.APersistentVector.assoc(APersistentVector.java:18) at clojure.lang.RT.assoc(RT.java:702) If I comment the map command I don't get any error (but of course I don't get any events either)

And that's where I'm stumped. I admit freely to being a complete clojure newbie, but I've spent the last 3 hours trying various things and I simply don't know if it's a bug or something I don't get. I've tried replacing the map function in the my-json-decoder with prn of both input and decoded-msg, and the first show #<byte[] [B@76932c13> which seems to be on par with what I read it's supposed to be, and decoded-msg is something like this: {:@timestamp "2016-02-05T13:50:33.000Z", :facility 3, :facility_label "system", :type "syslog", :logsource "nameofthehost", :host "ipofthehost", :pid "24718", :priority 30, :vpc "vpcid", :severity_label "Informational", :severity 6, :@version "1", :timestamp "Feb 5 13:50:33", :program "docker", :message "time=\"2016-02-05T13:50:33.501554356Z\" level=info msg=\"GET /v1.15/networks\""} And that also seems to be what we pass to an event when we create it manually.

So yeah, sorry if it's not a bug but just me being an idiot, but finding resources for this particular plugin isn't easy, so I hope I'm actually reporting something useful!

pyr commented 8 years ago

Hi,

Hopefully @weichuliu can chip in

NxwJfm commented 8 years ago

Thanks for the fast reply, much appreciated. As another test I just tried reverting to riemann 0.2.9 since it was the version tagged in the plugin, and I get the same result.

weichuliu commented 8 years ago

Hi,

I'll take a look. However I'm on the way to China and not sure if I can still get access to github in 2week. Hope you are not in hurry

On Friday, February 5, 2016, Pierre-Yves Ritschard notifications@github.com wrote:

Hi,

Hopefully @weichuliu https://github.com/weichuliu can chip in

— Reply to this email directly or view it on GitHub https://github.com/pyr/riemann-kafka/issues/11#issuecomment-180387597.

weichuliu commented 8 years ago

The example decoder is correspond to example encoder, which suppose each Kafka msg is a (Byted ) list of jsonized Riemann events.

If you have different input, please (info decoded-msg) to inspect and make your own decoder.

On Friday, February 5, 2016, Weichu Liu typhoonsword@gmail.com wrote:

Hi,

I'll take a look. However I'm on the way to China and not sure if I can still get access to github in 2week. Hope you are not in hurry

On Friday, February 5, 2016, Pierre-Yves Ritschard < notifications@github.com javascript:_e(%7B%7D,'cvml','notifications@github.com');> wrote:

Hi,

Hopefully @weichuliu https://github.com/weichuliu can chip in

— Reply to this email directly or view it on GitHub https://github.com/pyr/riemann-kafka/issues/11#issuecomment-180387597.

NxwJfm commented 8 years ago

I see, now that I read the code again it certainly makes sense. I'll try and do that then, I've seen what seems to be a good json library for clojure so hopefully a newbie like me will be able to do it. Would you guys want a PR to add this to the readme.md as another example (if I ever managed to do it)? Anyway, thanks again for the incredibly fast reply!

pyr commented 8 years ago

Docs and example would be very welcome!

NxwJfm commented 8 years ago

Well, I think I found the issue and it seems (at least I see events when I prn the streams) to be working. I just added [] to the map call: (map riemann.common/event [decoded-msg]) and it did the trick. Took me about 6 hours to figure this one out, so I hope this will help someone not go through this again! Feel free to add it to the readme.md file for people who like me would like to hook into any kafka json topic.