RedHatInsights / expandjsonsmt

Kafka Connect SMT to expand JSON field
Apache License 2.0
17 stars 18 forks source link

NullPointerException in makeUpdatedSchema #12

Open rmoff opened 3 years ago

rmoff commented 3 years ago

Using version 0.0.7 I get:

Caused by: java.lang.NullPointerException                                                                                                                                                        at com.redhat.insights.expandjsonsmt.ExpandJSON.makeUpdatedSchema(ExpandJSON.java:193)
        at com.redhat.insights.expandjsonsmt.ExpandJSON.makeUpdatedSchema(ExpandJSON.java:199)
        at com.redhat.insights.expandjsonsmt.ExpandJSON.applyWithSchema(ExpandJSON.java:91)                                                                                                      at com.redhat.insights.expandjsonsmt.ExpandJSON.apply(ExpandJSON.java:75)
        at com.redhat.insights.expandjsonsmt.ExpandJSON$Value.apply(ExpandJSON.java:222)

with config

    "transforms"                        : "expand",
    "transforms.expand.type"            : "com.redhat.insights.expandjsonsmt.ExpandJSON$Value",
    "transforms.expand.sourceFields"    : "text"

TransformationChain:

[2021-01-18 13:59:35,357] TRACE Applying transformation com.redhat.insights.expandjsonsmt.ExpandJSON$Value to SourceRecord{sourcePartition={}, sourceOffset={}} ConnectRecord{topic='networkrail_TRAIN_MVT_scrap', kafkaPartition=null, key=Struct{messageID=ID:opendata-backend.rockshore.net-39107-1609974717016-11:1:6:15:12375}, keySchema=Schema{io.confluent.connect.jms.Key:STRUCT}, value=Struct{messageID=ID:opendata-backend.rockshore.net-39107-1609974717016-11:1:6:15:12375,messageType=text,timestamp=1610978375306,deliveryMode=2,destination=Struct{destinationType=topic,name=TRAIN_MVT_EA_TOC},redelivered=false,expiration=1610978675306,priority=4,properties={},text=[{"header":{"msg_type":"0003","source_dev_id":"VDM0","user_id":"#QGP0170","original_data_source":"SDR","msg_queue_timestamp":"1610978368000","source_system_id":"TRUST"},"body":{"event_type":"ARRIVAL","gbtt_timestamp":"1610977860000","original_loc_stanox":"","planned_timestamp":"1610977740000","timetable_variation":"2","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"true","next_report_run_time":"","reporting_stanox":"16201","actual_timestamp":"1610977620000","correction_ind":"false","event_source":"MANUAL","train_file_address":null,"platform":"","division_code":"20","train_terminated":"true","train_id":"161T33MM18","offroute_ind":"false","variation_status":"EARLY","train_service_code":"21732000","toc_id":"20","loc_stanox":"16201","auto_expected":"false","direction_ind":"","route":"0","planned_event_type":"DESTINATION","next_report_stanox":"","line_ind":""}},{"header":{"msg_type":"0003","source_dev_id":"","user_id":"","original_data_source":"SMART","msg_queue_timestamp":"1610978368000","source_system_id":"TRUST"},"body":{"event_type":"DEPARTURE","gbtt_timestamp":"","original_loc_stanox":"","planned_timestamp":"1610978400000","timetable_variation":"1","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"true","next_report_run_time":"5","reporting_stanox":"16460","actual_timestamp":"1610978340000","correction_ind":"false","event_source":"AUTOMATIC","train_file_address":null,"platform":"","division_code":"20","train_terminated":"false","train_id":"361P25MJ18","offroute_ind":"false","variation_status":"EARLY","train_service_code":"21734000","toc_id":"20","loc_stanox":"16460","auto_expected":"true","direction_ind":"DOWN","route":"1","planned_event_type":"DEPARTURE","next_report_stanox":"16416","line_ind":"L"}},{"header":{"msg_type":"0003","source_dev_id":"VCJM","user_id":"#QGE7010","original_data_source":"SDR","msg_queue_timestamp":"1610978373000","source_system_id":"TRUST"},"body":{"event_type":"ARRIVAL","gbtt_timestamp":"1610978220000","original_loc_stanox":"","planned_timestamp":"1610978220000","timetable_variation":"0","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"true","next_report_run_time":"2","reporting_stanox":"16232","actual_timestamp":"1610978220000","correction_ind":"false","event_source":"MANUAL","train_file_address":null,"platform":"","division_code":"20","train_terminated":"false","train_id":"161T68MN18","offroute_ind":"false","variation_status":"ON TIME","train_service_code":"21732000","toc_id":"20","loc_stanox":"16232","auto_expected":"false","direction_ind":"","route":"0","planned_event_type":"ARRIVAL","next_report_stanox":"16233","line_ind":""}},{"header":{"msg_type":"0003","source_dev_id":"VCJM","user_id":"#QGE7010","original_data_source":"SDR","msg_queue_timestamp":"1610978373000","source_system_id":"TRUST"},"body":{"event_type":"DEPARTURE","gbtt_timestamp":"1610978340000","original_loc_stanox":"","planned_timestamp":"1610978340000","timetable_variation":"0","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"true","next_report_run_time":"8","reporting_stanox":"16232","actual_timestamp":"1610978340000","correction_ind":"false","event_source":"MANUAL","train_file_address":null,"platform":"","division_code":"20","train_terminated":"false","train_id":"161T68MN18","offroute_ind":"false","variation_status":"ON TIME","train_service_code":"21732000","toc_id":"20","loc_stanox":"16232","auto_expected":"false","direction_ind":"","route":"","planned_event_type":"DEPARTURE","next_report_stanox":"16233","line_ind":""}}]}, valueSchema=Schema{io.confluent.connect.jms.Value:STRUCT}, timestamp=1610978375306, headers=ConnectHeaders(headers=)} (org.apache.kafka.connect.runtime.TransformationChain)

📎 Debug SMT output: com.github.jcustenborder.kafka.connect.transform.common.Debug.txt

rmoff commented 3 years ago

@Josca any suggestions on how to resolve this error? Thanks :)

Josca commented 3 years ago

@maorfr It seems the source field is missing in your input data. Maybe only in one row.

rmoff commented 3 years ago

Ah, that might make sense, given the data. Do you think condition could be handled by the SMT?

Josca commented 3 years ago

@rmoff definitelly could be. But someone would have to implement that :smile: . Feel free to create PR! Maybe we could add defaultValue for that case?

rmoff commented 3 years ago

Feel free to create PR!

😁 I would love to but I'm not a Java coder. Shall we leave this issue open for now for others to help with if they can?

Josca commented 3 years ago

@rmoff sure, keep it open if you want

rmoff commented 3 years ago

I'm just revisiting this now. Looking at the connect Record, I noticed that the field I'm interested in is a JSON array:

text=[{"header":{"msg_type":"0003","source_dev_id":"","user_id":"","original_data_source":"SMART","msg_queue_timestamp":"1616084988000","source_system_id":"TRUST"},"body":{"event_type":"DEPARTURE","gbtt_timestamp":"","original_loc_stanox":"","planned_timestamp":"1616084910000","timetable_variation":"1","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"true","next_report_run_time":"4","reporting_stanox":"16495","actual_timestamp":"1616084940000","correction_ind":"false","event_source":"AUTOMATIC","train_file_address":null,"platform":"","division_code":"20","train_terminated":"false","train_id":"151K781Q18","offroute_ind":"false","variation_status":"LATE","train_service_code":"21734000","toc_id":"20","loc_stanox":"16495","auto_expected":"true","direction_ind":"UP","route":"2","planned_event_type":"DEPARTURE","next_report_stanox":"16602","line_ind":"L"}},{"header":{"msg_type":"0003","source_dev_id":"","user_id":"","original_data_source":"SMART","msg_queue_timestamp":"1616084988000","source_system_id":"TRUST"},"body":{"event_type":"ARRIVAL","gbtt_timestamp":"","original_loc_stanox":"","planned_timestamp":"","timetable_variation":"0","original_loc_timestamp":"","current_train_id":"","delay_monitoring_point":"false","next_report_run_time":"","reporting_stanox":"","actual_timestamp":"1616085000000","correction_ind":"false","event_source":"AUTOMATIC","train_file_address":null,"platform":"","division_code":"20","train_terminated":"false","train_id":"129M301P18","offroute_ind":"true","variation_status":"OFF ROUTE","train_service_code":"21731000","toc_id":"20","loc_stanox":"18441","auto_expected":"","direction_ind":"","route":"","planned_event_type":"ARRIVAL","next_report_stanox":"","line_ind":""}}]

So could the NPE seen above be caused by that?