confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
128 stars 1.04k forks source link

Java Exceptions when using UDF EXTRACTJSONFIELD #1562

Closed kenXengineering closed 6 years ago

kenXengineering commented 6 years ago

I am having an issue getting fields out of a JSON Object that is stored as a VARCHAR column in my stream. When I use the EXTRACTJSONFIELD UDF on the column, I get the following Java Expections:

[2018-07-09 22:06:21,137] ERROR Error calculating column with index 0 : KSQL_COL_0 (io.confluent.ksql.structured.SelectValueMapper:75)
java.lang.reflect.InvocationTargetException
    at sun.reflect.GeneratedMethodAccessor117.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.janino.ScriptEvaluator.evaluate(ScriptEvaluator.java:756)
    at org.codehaus.janino.ScriptEvaluator.evaluate(ScriptEvaluator.java:748)
    at io.confluent.ksql.structured.SelectValueMapper.apply(SelectValueMapper.java:72)
    at io.confluent.ksql.structured.SelectValueMapper.apply(SelectValueMapper.java:33)
    at org.apache.kafka.streams.kstream.internals.AbstractStream$2.apply(AbstractStream.java:87)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:56)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:288)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:957)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: io.confluent.ksql.function.KsqlFunctionException: Invalid JSON format:{Active=true, Collect=true, Tupelo=0, IpSuppression=, LookupKey=TORR, TitleId=e782c65d-f696-4bbe-973d-2c146e1c3a6a, Machine=CHICAGO, Workspace=TGD11}
    at io.confluent.ksql.function.udf.json.JsonExtractStringKudf.parseJsonDoc(JsonExtractStringKudf.java:87)
    at io.confluent.ksql.function.udf.json.JsonExtractStringKudf.evaluate(JsonExtractStringKudf.java:47)
    at SC.eval0(Unknown Source)
    ... 37 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('A' (code 65)): was expecting double-quote to start field name
 at [Source: (String)"{Active=true, Collect=true, Tupelo=0, IpSuppression=, LookupKey=TORR, TitleId=e782c65d-f696-4bbe-973d-2c146e1c3a6a, Machine=CHICAGO, Workspace=TGD11}"; line: 1, column: 3]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:669)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:567)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddName(ReaderBasedJsonParser.java:1757)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextFieldName(ReaderBasedJsonParser.java:907)
    at com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:246)
    at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:68)
    at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15)
    at com.fasterxml.jackson.databind.ObjectReader._bindAsTree(ObjectReader.java:1657)
    at com.fasterxml.jackson.databind.ObjectReader._bindAndCloseAsTree(ObjectReader.java:1627)
    at com.fasterxml.jackson.databind.ObjectReader.readTree(ObjectReader.java:1358)
    at io.confluent.ksql.function.udf.json.JsonExtractStringKudf.parseJsonDoc(JsonExtractStringKudf.java:85)
    ... 39 more

I created the stream as follows: create stream ingestion_stream (v varchar, m varchar, sync varchar, DateStamp varchar, AccessKey varchar, TitleData varchar, ErrorMessage varchar, ErrorStack varchar, IsBot varchar, DateCreated varchar) with (KAFKA_TOPIC='api_ingestion', VALUE_FORMAT='json');

The TitleData field is a JSON Object. Example Data:

{
  "v": "4f7a5e31-04e8-4da5-b1d2-77ab7c2846dc",
  "m": "1a02471b-4064-41bb-85b9-c6cbbf28d461",
  "sync": null,
  "DateStamp": "2018-07-09 22:01:17.233",
  "AccessKey": "2521DD701181",
  "TitleData": {
    "TitleId": "0b1a5b4f-37f6-4be6-a545-cce3beaa35c4",
    "LookupKey": "BELK",
    "Machine": "CHICAGO",
    "Workspace": "TGD21",
    "Active": true,
    "Collect": true,
    "Tupelo": 0,
    "IpSuppression": ""
  },
  "ErrorMessage": null,
  "ErrorStack": null,
  "IsBot": false,
  "DateCreated": "2018-07-09T22:01:17.2337835Z"
}

Output from a query using EXTRACTJSONFIELD:

ksql> select TitleData, EXTRACTJSONFIELD(TitleData, '$.TitleId') from ingestion_stream limit 1;
{Active=true, Collect=true, Tupelo=0, IpSuppression=, LookupKey=TORR, TitleId=e782c65d-f696-4bbe-973d-2c146e1c3a6a, Machine=CHICAGO, Workspace=TGD11} | null

And for good measure, a describe on the stream:


Name                 : INGESTION_STREAM
Type                 : STREAM
Key field            : 
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : api_ingestion (partitions: 12, replication: 2)

 Field        | Type                      
------------------------------------------
 ROWTIME      | BIGINT           (system) 
 ROWKEY       | VARCHAR(STRING)  (system) 
 V            | VARCHAR(STRING)           
 M            | VARCHAR(STRING)           
 SYNC         | VARCHAR(STRING)           
 DATESTAMP    | VARCHAR(STRING)           
 ACCESSKEY    | VARCHAR(STRING)           
 TITLEDATA    | VARCHAR(STRING)           
 ERRORMESSAGE | VARCHAR(STRING)           
 ERRORSTACK   | VARCHAR(STRING)           
 ISBOT        | VARCHAR(STRING)           
 DATECREATED  | VARCHAR(STRING)           
------------------------------------------

Local runtime statistics
------------------------

(Statistics of the local KSQL server interaction with the Kafka topic api_ingestion)

I am running KSQL via Docker using the confluentinc/cp-ksql-server:5.0.0-beta180702222458 image. Running against a Kafak 1.1.0 cluster.

Please let me know if you need anything else, thanks!

apurvam commented 6 years ago

Hmm. It looks like the nested JSON string has the quotes removed from the field name, causing a json parse exception. Does this work on KSQL 4.1. If you must use docker, could you try reproducing with cp-ksql-server:5.0.0-beta30?

Thanks, Apurva

kenXengineering commented 6 years ago

@apurvam Just tried it with the Docker image cp-ksql-server:5.0.0-beta30 and it is working.

ksql> select TitleData, EXTRACTJSONFIELD(TitleData, '$.TitleId') from ingestion_stream limit 1;
{"TitleId":"db5d229d-19b5-49c6-a0b3-80fa19caa054","LookupKey":"CENT","Machine":"CHICAGO","Workspace":"TGD21","Active":true,"Collect":true,"Tupelo":0,"IpSuppression":""} | db5d229d-19b5-49c6-a0b3-80fa19caa054

I used the same create stream query as before. Looks like the current version of ksql is dropping the quote character when parsing the json message into a varchar column.

apurvam commented 6 years ago

cc @hjafarpour

This might be an issue with the new json converter.

kenXengineering commented 6 years ago

I also wanted to note that using STRUCT worked on the latest beta version. I downloaded the June preview release and redid my streams using STRUCT this time, and had no issue. Was able to do some awesome things with KSQL, really liking it so far!

hjafarpour commented 6 years ago

@chosenken Now that we have support for STRUCT you should use struct instead of EXTRACTJSONFIELD. Our new Serde now reads the nested JSON fields as struct and when it's converted into string it removes the double quotes from the field names which results in parsing error in EXTRACTJSONFIELD. You can now only use EXTRACTJSONFIELD if the fields is indeed string and the content of the string is a valid JSON.

apurvam commented 6 years ago

Do we have docs which explain when to use EXTRACTJSONFIELD vs when to use STRUCT?

Might be worth linking to here and then closing this out.

tpanagos commented 6 years ago

I am parking this here for Future-Me. I am using the KSQL-CLI in a similar situation. You can see the details in my StackOverflow question. The symptom of this in the CLI is the return of a null column value:

ksql> SELECT EXTRACTJSONFIELD(obj, '$.owner.org') AS org FROM objs1;
null
tpanagos commented 6 years ago

@hjafarpour I think this degrades the usefulness of KSQL for a use-case where the JSON format of each message is not consistent. I know this limitation exists in other parts of the Confluent suite (Kafka Connect, for instance, hates this inconsistency).

In my case the deep structures of the JSON messages will be different from message to message. I am looking to use the field lookup function to cherry-pick messages that do contain a specific field. I do not see a way to accomplish this with STRUCT in a straight-forward way because the CREATE STREAM would need to layout every possible field name.

STRUCT also becomes a bit onerous for deeply nested messages because of the need for nesting STRUCTs in the CREATE STREAM syntax. Am I missing an approach?

hjafarpour commented 6 years ago

@tpanagos I agree that the use cases you mentioned are valid and we should be able to handle them in KSQL. SO here is a fix that hopefully will make it easier to use KSQL: #1962