RADAR-base / radar-upload-source-connector

Manually upload data files
Apache License 2.0
1 stars 1 forks source link

Cannot serialize AltoidaSummaryMetrics #125

Closed nivemaham closed 4 years ago

nivemaham commented 4 years ago

The latest dev changes report below error when uploading altoida test metrics to kafka. It throws the Cannot redefine: exception. Any idea @blootsvoets @yatharthranjan ? The schema definition looks like below.

  "namespace": "org.radarcns.connector.upload.altoida",
  "name": "AltoidaSummaryMetrics",
  "type": "record",
  "doc": "Metrics from a set of tests.",
  "fields": [
    {"name": "audioHighReactionTimes", "type": "float", "doc": "Distribution of reaction times to high tone(s)."},
    {"name": "audioHighAccuracy", "type": "float", "doc": "Distribution of deviation from speaker button center (cm)."},
    {"name": "audioLowReactions", "type": "int", "doc": "Distributions of number of reactions to low tones."},
    {"name": "audioIgnoredHighTonePercentage", "type": "float", "doc": "Distribution of ratio of high tones ignored."},
    {"name": "audioPrematureToneButtonPresses", "type": "int", "doc": "Distribution of number of premature tone button presses."},
    {"name": "randomScreenPressesDuringPlacement", "type": "int", "doc": "Distribution of number of random screen presses during the placement phase."},
    {"name": "randomScreenPressesDuringSearch", "type": "int", "doc": "Distribution of number of random screen presses during placement."},
    {"name": "tooMuchMovementCount", "type": "int", "doc": "Distribution of number of times placement was impossible due to too much movement."},
    {"name": "findBetterPlaceCount", "type": "float", "doc": "Number of times the user had to find a better place."},
    {"name": "introReadTimes", "type": "float", "doc": "Mean time the user required to read an intro page (s)."},
    {"name": "introReadTime1", "type": "float", "doc": "Time the user required to read the first intro page (s)."},
    {"name": "introReadTime2", "type": "float", "doc": "Time the user required to read the second intro page (s)."},
    {"name": "placeDelays", "type": "float", "doc": "Mean delay from possible placement to actual placement (s)."},
    {"name": "spotAlreadyTakenCount", "type": "int", "doc": "Number of times the user tried to use the same spot twice."},
    {"name": "trials", "type": {
      "type": "array",
      "items": "AltoidaTrial"
    }, "doc": "Per-trials details and analyses."},
    {"name": "trialMeans", "type": "AltoidaTrial", "doc": "Mean values of each of the trials."},
    {"name": "findFailCount", "type": "float", "doc": "Number of times the user failed to find an object."},
    {"name": "findSkipDurations", "type": "float", "doc": "Mean time the user tried before skipping an object in seconds."},
    {"name": "skipButtonCount", "type": "float", "doc": "Number of times the user pressed the skip button."},
    {"name": "countdownFail", "type": "float", "doc": "Number of countdown fails."},
    {"name": "stepCountRatio", "type": "float", "doc": "Distribution of the ratio between the steps needed for finding the objects over the steps needed for placing them."},
    {"name": "stepDelayP", "type": "float", "doc": "Mean delay between steps when placing objects (s)."},
    {"name": "stepDelayF", "type": "float", "doc": "Mean delay between steps when finding objects (s)."},
    {"name": "stepVarianceP", "type": "float", "doc": "Variance of delays between steps when placing objects."},
    {"name": "stepVarianceF", "type": "float", "doc": "Variance of delays between steps when finding objects."},
    {"name": "notWalkingTimeP", "type": "float", "doc": "Time the user did not walk during placing objects (s)."},
    {"name": "notWalkingTimeF", "type": "float", "doc": "Time the user did not walk during finding objects (s)."},
    {"name": "shockCount", "type": "float", "doc": "Number of shocks that were recorded during the test."},
    {"name": "accelerationVarianceX", "type": "float", "doc": "Variance of acceleration data of x axis."},
    {"name": "accelerationVarianceY", "type": "float", "doc": "Variance of acceleration data of y axis."},
    {"name": "accelerationVarianceZ", "type": "float", "doc": "Variance of acceleration data of z axis."},
    {"name": "strongHand", "type": "float", "doc": "Dominant hand of user."},
    {"name": "normalisationCircle", "type": "AltoidaWalkingTestAggregate", "doc": "Deviation from optimal path in circle normalisation test (cm)."},
    {"name": "normalisationSquare", "type": "AltoidaWalkingTestAggregate", "doc": "Deviation from optimal path in square normalisation test (cm)."},
    {"name": "normalisationSerpentine", "type": "AltoidaWalkingTestAggregate", "doc": "Deviation from optimal path in serpentine normalisation test (cm)."},
    {"name": "normalisationSpeedCircle", "type": "AltoidaWalkingTestAggregate", "doc": "Deviation from optimal path in speed circle normalisation test (cm)."},
    {"name": "normalisationRandomTapping", "type": "AltoidaTappingTestAggregate", "doc": "Deviation from center points in random tapping normalisation test (cm)."},
    {"name": "normalisationTapping", "type": "AltoidaTappingTestAggregate", "doc": "Deviation from center points in tapping normalisation test (cm)."}          
  ]
}
[2020-01-27 13:09:15,353] INFO WorkerSourceTask{id=radar-upload-source-3} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-01-27 13:09:15,369] INFO WorkerSourceTask{id=radar-upload-source-3} Finished commitOffsets successfully in 16 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-01-27 13:09:15,369] ERROR WorkerSourceTask{id=radar-upload-source-3} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:270)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:294)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:229)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic connect_upload_altoida_bit_metrics :
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:83)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:270)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 11 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: org.apache.avro.SchemaParseException: Can't redefine: org.radarcns.connector.upload.altoida.AltoidaTrial
    at org.apache.avro.Schema$Names.put(Schema.java:1128)
    at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690)
    at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
    at org.apache.avro.Schema.toString(Schema.java:324)
    at org.apache.avro.Schema.toString(Schema.java:314)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70)
    at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:131)
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:270)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:270)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:294)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:229)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

What I have tried so far

  1. Create avro data with meta_data_config=false
    
                .with(AvroDataConfig.CONNECT_META_DATA_CONFIG, false)
                .with(AvroDataConfig.SCHEMAS_CACHE_SIZE_CONFIG, 20)
                .with(AvroDataConfig.ENHANCED_AVRO_SCHEMA_SUPPORT_CONFIG, true)
                .build())```
  2. Tried setting value.converter.connect.meta.data=false in source-upload.properties
  3. Tried setting value.converter.connect.meta.data=false in docker-compose/kube deployment.
nivemaham commented 4 years ago

This one is similar to #120 Both can be fixed by applying the proposed change to the docker env variable. Adding env variable CONNECT_VALUE_CONVERTER_CONNECT_META_DATA=false fixes this issue.

nivemaham commented 4 years ago

Fixed by #126