jcustenborder / kafka-connect-spooldir

Kafka Connect connector for reading CSV files into Kafka.
Apache License 2.0
165 stars 124 forks source link

Could not parse <field> to 'Date' #200

Open acamillo opened 2 years ago

acamillo commented 2 years ago

Hi, I am using v2.0.62 of the library with Kafka connect 6.1.0.

I am trying to parse CSV with values as: abcd, 2022-01-10T23:09:55.000Z, abcdef, ....

Using this schema

{
  "name": "com.github.jcustenborder.kafka.connect.model.Value",
  "type": "STRUCT",
  "isOptional": false,
  "fieldSchemas": {
    "Log_Source": {
      "type": "STRING",
      "isOptional": false
    },
    "Time": {
      "name": "org.apache.kafka.connect.data.Timestamp",
      "type": "INT64",
      "version": 1,
      "isOptional": false
    },
    "ID": {
      "type": "STRING",
      "isOptional": false
    },
......
  }
}

Connector creation and configuration:

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ohs-access-csv-spooldir-03/config \
    -d '{
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "topic": "ohs_access_03",
        "input.path": "/data/unprocessed",
        "finished.path": "/data/processed",
        "error.path": "/data/error",
        "input.file.pattern": ".*\\.csv",
        "parser.timestamp.date.formats": "yyyy-MM-dd'\''T'\''HH:mm:ss.SSSXXX",
        "key.schema" : "{\n  \"name\" : \"com.github.jcustenborder.kafka.connect.model.Key\",\n  \"type\" : \"STRUCT\",\n  \"isOptional\" : false,\n  \"fieldSchemas\" : {\n    \"ID\" : {\n      \"type\" : \"STRING\",\n      \"isOptional\" : false\n    }\n  }\n}",
        "value.schema" : "{\n  \"name\": \"com.github.jcustenborder.kafka.connect.model.Value\",\n  \"type\": \"STRUCT\",\n  \"isOptional\": false,\n  \"fieldSchemas\": {\n    \"Log_Source\": {\n      \"type\": \"STRING\",\n      \"isOptional\": false\n    },\n    \"Time\": {\n      \"name\": \"org.apache.kafka.connect.data.Timestamp\",\n      \"type\": \"INT64\",\n      \"version\": 1,\n      \"isOptional\": false\n    },\n    \"ID\": {\n      \"type\": \"STRING\",\n      \"isOptional\": false\n    },\n    \"Entity\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Entity_Type\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Log_Entity\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Original_Log_Content\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Message\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Parse_Failed\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Entity_GUID\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Data_Services_Load_Time\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Host_Name_Server\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Problem_Priority\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Annotation_Identifier\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Log_Index\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Solr_Cluster_CoreGroup_ID\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    },\n    \"Label\": {\n      \"type\": \"STRING\",\n      \"isOptional\": true\n    }\n  }\n}"

    }'

When running I get the following exception:

[2022-02-23 14:56:24,201] ERROR [ohs-access-csv-spooldir-03|task-0] Exception encountered processing line 1 of /data/unprocessed/test.csv. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask:258)
my-local-kafka-connect    | org.apache.kafka.connect.errors.DataException: Exception thrown while parsing data for 'Time'. linenumber=1
my-local-kafka-connect    |     at com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask.process(SpoolDirCsvSourceTask.java:136)
my-local-kafka-connect    |     at com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask.read(AbstractSourceTask.java:254)
my-local-kafka-connect    |     at com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask.poll(AbstractSourceTask.java:148)
my-local-kafka-connect    |     at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289)
my-local-kafka-connect    |     at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
my-local-kafka-connect    |     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
my-local-kafka-connect    |     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
my-local-kafka-connect    |     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
my-local-kafka-connect    |     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
my-local-kafka-connect    |     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
my-local-kafka-connect    |     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
my-local-kafka-connect    |     at java.base/java.lang.Thread.run(Thread.java:834)
my-local-kafka-connect    | Caused by: org.apache.kafka.connect.errors.DataException: Could not parse 'Time' to 'Date'
my-local-kafka-connect    |     at com.github.jcustenborder.kafka.connect.utils.data.Parser.parseString(Parser.java:113)
my-local-kafka-connect    |     at com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask.process(SpoolDirCsvSourceTask.java:128)
my-local-kafka-connect    |     ... 11 more
my-local-kafka-connect    | Caused by: java.lang.IllegalStateException: Could not parse 'Time' to java.util.Date
my-local-kafka-connect    |     at shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:589)
my-local-kafka-connect    |     at com.github.jcustenborder.kafka.connect.utils.data.type.BaseDateTypeParser.parseString(BaseDateTypeParser.java:55)
my-local-kafka-connect    |     at com.github.jcustenborder.kafka.connect.utils.data.Parser.parseString(Parser.java:109)
my-local-kafka-connect    |     ... 12 more 

Any idea what could be the cause of the issue? Thank you.

jcustenborder commented 2 years ago

@acamillo Does your csv have a header? I'm wondering if it's actually trying to parse Time with that date format.

acamillo commented 2 years ago

@jcustenborder yes it does have header. However as by recommendation in various blogs I thought to force the schema myself

jcustenborder commented 2 years ago

haha no please use the schema.

Try setting "csv.first.row.as.header":"true" in your config.

acamillo commented 2 years ago

@jcustenborder yes, that solved the problem. It was actually trying to parse the header line assuming it to be a normal data row.

Thanks for the super fast feedback.

If you don't mind taking a suggestion. The problem would have been much easier to spot (for me) if the exception output did contain the offending data row, that triggered the exception. Maybe in a future release ?