jcustenborder / kafka-connect-splunk

Kafka Connect connector for receiving data and writing data to Splunk.
Apache License 2.0
25 stars 10 forks source link

Exiting WorkerSinkTask due to unrecoverable exception #3

Closed lilgreenwein closed 7 years ago

lilgreenwein commented 7 years ago

I'm trying to do a simple setup of this connector - basically just send a topic to a local Splunk heavy forwarder running HEC on port 9999. When I fire up Connect (or restart this connector) I'm getting the following exception:

[2017-01-12 22:30:47,712] ERROR Task sawmill_stats-splunk-sink-3 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:404)
java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
    at io.confluent.kafka.connect.splunk.SinkRecordContent.writeTo(SinkRecordContent.java:198)
    at com.google.api.client.http.GZipEncoding.encode(GZipEncoding.java:49)
    at com.google.api.client.http.HttpEncodingStreamingContent.writeTo(HttpEncodingStreamingContent.java:51)
    at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:79)
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981)
    at io.confluent.kafka.connect.splunk.SplunkHttpSinkTask.put(SplunkHttpSinkTask.java:178)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:384)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:240)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
[2017-01-12 22:30:47,712] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:405)
jcustenborder commented 7 years ago

@lilgreenwein Do you have any idea of what the content is being posted? It would be great to setup a repo.

lilgreenwein commented 7 years ago

Content is syslog messages in JSON format. An example (obtained via kafka-console-consumer):

{
  "facility": "syslog",
  "host": "XXXXXX",
  "message": "Thu Jan 19 18:22:43 2017: main Q: origin=core.queue size=0 enqueued=39 full=0 discarded.full=0 discarded.nf=0 maxqsize=39 ",
  "severity": "info",
  "syslog-tag": "sawmill.stats",
  "timestamp": "2017-01-19T18:22:43.691475+00:00"
}

I'm not using Schema Registry since the source data is in JSON and I want it forwarded to Splunk in JSON format, I have no need to convert to avro. From my connect-distributed.properties file:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

And the config for this connector:

{
  "connector.class": "io.confluent.kafka.connect.splunk.SplunkHttpSinkConnector",
  "name": "sawmill-stats-splunk-sink",
  "splunk.auth.token": "XXXXXX",
  "splunk.remote.host": "localhost",
  "splunk.remote.port": "9999",
  "splunk.ssl.enabled": "true",
  "splunk.ssl.validate.certs": "false",
  "tasks.max": "5",
  "topics": "sawmill_stats"
}

I'm not sure, but I believe the issue may be somehow related to the key / value converter. When I create a file sink with the same config, it works fine but the output is not in JSON:

{severity=info, host=XXXXXX, message=Thu Jan 12 21:53:06 2017: main Q: origin=core.queue size=0 enqueued=28 full=0 discarded.full=0 discarded.nf=0 maxqsize=39 , facility=syslog, syslog-tag=sawmill.stats, timestamp=2017-01-12T21:53:06.323303+00:00}
jcustenborder commented 7 years ago

Thanks for the repro! I'll take a look.

jcustenborder commented 7 years ago

@lilgreenwein Got a repro that works. I'll put in a fix tonight. Basically I'm expecting to always get a Struct. When something else shows up it gets angry.

Pull-4 has a repro.

jcustenborder commented 7 years ago

@lilgreenwein Can you take a look at Pull 4? I removed the dependence on a value schema and add test cases. Maps, Structs, Numbers, Strings, and Booleans all can be passed in the connect record. This should cover your case pretty well.

lilgreenwein commented 7 years ago

Checked out issue-3 branch and deploying now...

lilgreenwein commented 7 years ago

That worked, messages are flowing now. Thanks Justin!

jcustenborder commented 7 years ago

@lilgreenwein Wonderful! I'll merge that pull into master.