jcustenborder / kafka-connect-spooldir

Kafka Connect connector for reading CSV files into Kafka.
Apache License 2.0
160 stars 124 forks source link

Handling timestamps that are in "YYYYMMDDHHMMSS.MMM" format #71

Open eksantrik opened 5 years ago

eksantrik commented 5 years ago

What would be the best method to handle a timestamp in "20180530143000.167" format using kafka-connect-spooldir // Kafka // Tranquility? I tried INT64 on kafka-connect-spooldir but had no luck.

I am getting the following error: [2018-11-16 16:10:01,380] ERROR Exception encountered processing line 2 of /var/input/BW-CDR-20180531004500-2-25d3128f-218647.csv. (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceTask:290) org.apache.kafka.connect.errors.DataException: Exception thrown while parsing data for 'startTime'. linenumber=2 at com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask.process(SpoolDirCsvSourceTask.java:126) at com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceTask.read(SpoolDirSourceTask.java:286) at com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceTask.poll(SpoolDirSourceTask.java:165) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.DataException: Could not parse '20180530143000.177' to 'Long' at com.github.jcustenborder.kafka.connect.utils.data.Parser.parseString(Parser.java:113) at com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask.process(SpoolDirCsvSourceTask.java:118) ... 11 more Caused by: java.lang.NumberFormatException: For input string: "20180530143000.177" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:589) at java.lang.Long.parseLong(Long.java:631) at com.github.jcustenborder.kafka.connect.utils.data.type.Int64TypeParser.parseString(Int64TypeParser.java:24) at com.github.jcustenborder.kafka.connect.utils.data.Parser.parseString(Parser.java:109) ... 12 more [2018-11-16 16:10:01,382] INFO Closing /var/input/BW-CDR-20180531004500-2-25d3128f-218647.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceTask:191) [2018-11-16 16:10:01,382] ERROR Error during processing, moving /var/input/BW-CDR-20180531004500-2-25d3128f-218647.csv to /var/error. (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceTask:199) [2018-11-16 16:10:01,383] INFO Removing processing file /var/input/BW-CDR-20180531004500-2-25d3128f-218647.csv.PROCESSING (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceTask:213) [2018-11-16 16:10:01,633] INFO Opening /var/input/BW-CDR-20180531004500-2-2f016b82-218430.csv (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceTask:257) [2018-11-16 16:10:01,635] INFO configure() - field names from header row. fields = recordId, serviceProvider, type, userNumber, groupNumber, direction, callingNumber, callingPresentationIndicator, calledNumber, startTime, userTimeZone, answerIndicator, answerTime, releaseTime, terminationCause, networkType, dialedDigits, callCategory, networkCallType, networkTranslatedNumber, releasingParty, route, networkCallID, codec, accessDeviceAddress, accessCallID, group, department, originalCalledNumber, originalCalledPresentationIndicator, originalCalledReason, redirectingNumber, redirectingPresentationIndicator, redirectingReason, chargeIndicator, typeOfNetwork, localCallId, remoteCallId, key, cancelCWTperCall.facResult, clidBlockingPerCall.invocationTime, clidBlockingPerCall.facResult, directVMTransfer.invocationTime, directVMTransfer.facResult, userId, otherPartyName, otherPartyNamePresentationIndicator, trunkGroupName, clidPermitted, relatedCallId, relatedCallIdReason, transfer.invocationTime, transfer.result, transfer.relatedCallId, transfer.type, codecUsage, trunkGroupInfo, asCallType, configurableCLID, callCenter.outgoingCallCenterPhoneNumber, namePermitted, callCenter.outgoingCallCenterUserId, location, locationType, locationUsage, userAgent, extTrackingId, flexibleSeatingGuest.invocationTime (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceTask:60) [2018-11-16 16:10:01,635] ERROR Exception encountered processing line 2 of /var/input/BW-CDR-20180531004500-2-2f016b82-218430.csv. (com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceTask:290) org.apache.kafka.connect.errors.DataException: Exception thrown while parsing data for 'startTime'. linenumber=2 at com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask.process(SpoolDirCsvSourceTask.java:126) at com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceTask.read(SpoolDirSourceTask.java:286) at com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceTask.poll(SpoolDirSourceTask.java:165) at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.DataException: Could not parse '20180530143000.167' to 'Long' at com.github.jcustenborder.kafka.connect.utils.data.Parser.parseString(Parser.java:113) at com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask.process(SpoolDirCsvSourceTask.java:118) ... 11 more Caused by: java.lang.NumberFormatException: For input string: "20180530143000.167" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:589) at java.lang.Long.parseLong(Long.java:631) at com.github.jcustenborder.kafka.connect.utils.data.type.Int64TypeParser.parseString(Int64TypeParser.java:24) at com.github.jcustenborder.kafka.connect.utils.data.Parser.parseString(Parser.java:109) ... 12 more

eksantrik commented 5 years ago

We ended up converting the timestamps to epoch before injecting them into Kafka. I that case INT64 type works. Everything works now. However, we were testing the "timestamp.field" and "timestamp.mode" attributes and not able to understand how they are used. We created our config file like the following:

name=test tasks.max=1 connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector input.file.pattern=^.*.csv$ finished.path=/var/test_finished input.path=/var/test_input error.path=/var/test_error halt.on.error=false topic=test timestamp.field=processTime timestamp.mode=PROCESS_TIME key.schema={"name":"com.example.users.UserKey","type":"STRUCT","isOptional":false,"fieldSchemas":{"recordId":{"type":"STRING","isOptional":false},"startTime":{"type":"INT64","isOptional":false}}} value.schema={"name":"com.example.users.User","type":"STRUCT","isOptional":false,"fieldSchemas":{"recordId":{"type":"STRING","isOptional":false},"serviceProvider":{"type":"STRING","isOptional":true},"type":{"type":"STRING","isOptional":true},"userNumber":{"type":"STRING","isOptional":true},"callingNumber":{"type":"STRING","isOptional":true},"calledNumber":{"type":"STRING","isOptional":true},"startTime":{"type":"INT64","isOptional":false}}} csv.first.row.as.header=true

The goal was to give Kafka a timestamp field to process. It is injecting the records successfully but there is no trace of a "processTime" field in the topic. Are we using this feature correctly? Are we supposed to create a field in CSV called "processTime" and leave it blank so that kafka-connect can populate it with the processing time?

Thanks

jcustenborder commented 5 years ago

Did you try this? sorry for the delay in responding.

{
  "name" : "org.apache.kafka.connect.data.Timestamp",
  "type" : "INT64",
  "version" : 1,
  "isOptional" : true
}