jcustenborder / kafka-connect-splunk

Kafka Connect connector for receiving data and writing data to Splunk.
Apache License 2.0
25 stars 10 forks source link

Converting byte[] to Kafka Connect data failed due to serialization error #7

Closed lilgreenwein closed 7 years ago

lilgreenwein commented 7 years ago

I'm not sure if this is something that is related to my configuration or to the kafka-connect-splunk connector.

I'm using this connector as a sink to a Splunk heavy forwarder, here's my connector config:

{
  "connector.class": "io.confluent.kafka.connect.splunk.SplunkHttpSinkConnector",
  "splunk.ssl.enabled": "true",
  "splunk.remote.host": "localhost",
  "tasks.max": "5",
  "topics": "sawmill",
  "splunk.ssl.validate.certs": "false",
  "name": "sawmill-splunk-sink",
  "splunk.remote.port": "9999",
  "splunk.auth.token": "43C0A9FF-48E2-4FC5-89AB-C55AA5ECB2B1"
}

Messages are syslog messages in standardized JSON format, e.g.:

{"severity":"notice","message":" Test message","facility":"kern","syslog-tag":"sawmill_test:","timestamp":"2017-01-31T20:15:00+00:00"}

The issue is whenever the "message" portion of the JSON object has an escape character - \ - the connector crashes. Here's the error from connectDistributed.out when I send a message through with "\x0000" in it:

[2017-01-31 23:52:58,406] ERROR Task infocheck-sawmill-splunk-sink-4 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:305)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:358)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized character escape 'x' (code 120)
 at [Source: [B@aa4d27a; line: 1, column: 106]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized character escape 'x' (code 120)
 at [Source: [B@aa4d27a; line: 1, column: 106]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._handleUnrecognizedCharacterEscape(ParserMinimalBase.java:510)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._decodeEscaped(UTF8StreamJsonParser.java:3174)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2459)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishAndReturnString(UTF8StreamJsonParser.java:2414)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getText(UTF8StreamJsonParser.java:285)
    at com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:233)
    at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69)
    at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3736)
    at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2294)
    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:303)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:358)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
[2017-01-31 23:52:58,408] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
[2017-01-31 23:52:58,408] INFO Stopping... (io.confluent.kafka.connect.splunk.SplunkHttpSinkTask:216)

Ideas?

lilgreenwein commented 7 years ago

So researching this it's apparent that the issue is that the JSON parser - jackson - is interpreting the \x as an escape character and since \x doesn't fall within the JSON specification as an acceptable escape sequence it's throwing an exception. I could get around it by replacing all \'s with \'s, but I don't want to get in the business of running regex replaces on all messages entering my Kafka pipeline. Is there a way to tell the parser to ignore all escape sequences and just pass all JSON tokens literally?

jcustenborder commented 7 years ago

Hmm This exception looks like it's thrown below the connector in the Kafka level by the json converter. We might need to put together a repro and report this to the Kafka project.

jcustenborder commented 7 years ago

Can you give me an example of the offending json? or something that at least acts like it?

On Tue, Jan 31, 2017 at 7:30 PM Andrew Griffin notifications@github.com wrote:

So researching this it's apparent that the issue is that the JSON parser - jackson - is interpreting the \x as an escape character and since \x doesn't fall within the JSON specification as an acceptable escape sequence it's throwing an exception. I could get around it by replacing all 's with \'s, but I don't want to get in the business of running regex replaces on all messages entering my Kafka pipeline. Is there a way to tell the parser to ignore all escape sequences and just pass all JSON tokens literally?

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/jcustenborder/kafka-connect-splunk/issues/7#issuecomment-276538283, or mute the thread https://github.com/notifications/unsubscribe-auth/AAMzgAN0-G_vja1RZhArYzRgg2S3e4GDks5rX9IdgaJpZM4LzVaN .

lilgreenwein commented 7 years ago

An example would be:

{"severity":"notice","message":"Test message \x0000","facility":"kern","syslog-tag":"sawmill_test:","timestamp":"2017-01-31T20:15:00+00:00"}

I agree, I think this issue is down the stack, specifically Jackson in parsing the JSON. There's numerous discussions around this issue and the consensus from Jackson folks seems to be that it's a valid error - the JSON standard supports a specific set of escape sequences, and if your source JSON has a non-standard escape, fix your source JSON. Which is valid, but in this case where data is coming in via syslog, I can't control what comes in.

lilgreenwein commented 7 years ago

Forgive my ignorance, I'm not a Java dev, but a few discussions have mentioned adding this:

mapper.configure(JsonGenerator.Feature.ESCAPE_NON_ASCII, true);

I'm guessing this would need to be added in Jackson classes, right?

lilgreenwein commented 7 years ago

Ok you can close this one out and delete, it's not an issue with the kafka-connect-splunk connector.

I actually can control the data coming in. (I know I could have written up a long complex regex rule for rsyslog to find and replace escape characters or drop messages altogether, but that's to messy and too much CPU)

I've resolved the issue in my rsyslog config - not at the point of parsing messages when they come in but rather in the template rsyslog uses to write messages to kafka. I'll do a full write up on my blog but for any rsyslog admins who find this page via google: In your template your omkafka rules are using, change:

property(name  = "msg")

to:

property(name  = "msg" format = "json" controlcharacters = "escape")

Don't mess with the jsonf format - it doesn't work for kafka

Thanks Justin!

jcustenborder commented 7 years ago

Before we close this out. Are you using https://github.com/jcustenborder/kafka-connect-syslog to get the data into the kafka topic? If so there still is something we should do at the kafka level.

On Wed, Feb 1, 2017 at 3:13 PM Andrew Griffin notifications@github.com wrote:

Ok you can close this one out and delete, it's not an issue with the kafka-connect-splunk connector.

I actually can control the data coming in. (I know I could have written up a long complex regex rule for rsyslog to find and replace escape characters or drop messages altogether, but that's to messy and too much CPU)

I've resolved the issue in my rsyslog config - not at the point of parsing messages when they come in but rather in the template rsyslog uses to write messages to kafka. I'll do a full write up on my blog but for any rsyslog admins who find this page via google: In your template your omkafka rules are using, change:

property(name = "msg")

to:

property(name = "msg" format = "json" controlcharacters = "escape")

Don't mess with the jsonf format - it doesn't work for kafka

Thanks Justin!

— You are receiving this because you commented.

Reply to this email directly, view it on GitHub https://github.com/jcustenborder/kafka-connect-splunk/issues/7#issuecomment-276768383, or mute the thread https://github.com/notifications/unsubscribe-auth/AAMzgLJocEiiO7SL-dTr2SYl8LDli1yYks5rYOdbgaJpZM4LzVaN .

lilgreenwein commented 7 years ago

No, I'm using rsyslog with the omkafka output module

jcustenborder commented 7 years ago

Ok. That setting you were mentioning is only for generation with Jackson. This made me wonder that if you were using my syslog module and the built in json converter. If so that would mean that the jackson configuration in the json converter was allowing it, meaning there was is a bug in kafka. If it was a kafka bug we'd fix it there. Sorry man!

On Wed, Feb 1, 2017 at 3:28 PM Andrew Griffin notifications@github.com wrote:

No, I'm using rsyslog with the omkafka output module

— You are receiving this because you commented.

Reply to this email directly, view it on GitHub https://github.com/jcustenborder/kafka-connect-splunk/issues/7#issuecomment-276772268, or mute the thread https://github.com/notifications/unsubscribe-auth/AAMzgGqb-Ypi8CMCgUMjBEvaklLFn-y8ks5rYOrTgaJpZM4LzVaN .