spring-cloud / spring-cloud-stream-samples

Samples for Spring Cloud Stream
Apache License 2.0
956 stars 676 forks source link

schema-registry-confluent-producer throws: AvroRuntimeException: Not an array #171

Closed pedroarthur closed 4 years ago

pedroarthur commented 4 years ago

While trying to deploy spring-cloud-stream-samples, one is unable to produce messages. Both -producer1 and -producer2 fail with the following stack trace:

2019-12-10 18:31:39.352 ERROR 150460 --- [ask-scheduler-4] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Failed to invoke method; nested exception is org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not an array: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]} (through reference chain: com.example.Sensor["schema"]->org.apache.avro.Schema$RecordSchema["elementType"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: Not an array: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]} (through reference chain: com.example.Sensor["schema"]->org.apache.avro.Schema$RecordSchema["elementType"])
    at org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:115)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:167)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:359)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:275)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:272)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not an array: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]} (through reference chain: com.example.Sensor["schema"]->org.apache.avro.Schema$RecordSchema["elementType"]); nested exception is com.fasterxml.jackson.databind.JsonMappingException: Not an array: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]} (through reference chain: com.example.Sensor["schema"]->org.apache.avro.Schema$RecordSchema["elementType"])
    at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertToInternal(MappingJackson2MessageConverter.java:286)
    at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertToInternal(ApplicationJsonMessageMarshallingConverter.java:69)
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:201)
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:191)
    at org.springframework.messaging.converter.CompositeMessageConverter.toMessage(CompositeMessageConverter.java:83)
    at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.lambda$convertOutputValueIfNecessary$2(BeanFactoryAwareFunctionRegistry.java:622)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1631)
    at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127)
    at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:543)
    at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.convertOutputValueIfNecessary(BeanFactoryAwareFunctionRegistry.java:628)
    at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.doApply(BeanFactoryAwareFunctionRegistry.java:571)
    at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry$FunctionInvocationWrapper.get(BeanFactoryAwareFunctionRegistry.java:476)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:279)
    at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:263)
    at org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:112)
    ... 17 more
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Not an array: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]} (through reference chain: com.example.Sensor["schema"]->org.apache.avro.Schema$RecordSchema["elementType"])
    at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:394)
    at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:353)
    at com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:316)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:729)
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:166)
    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:721)
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:166)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValue(ObjectMapper.java:2795)
    at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertToInternal(MappingJackson2MessageConverter.java:270)
    ... 41 more
Caused by: org.apache.avro.AvroRuntimeException: Not an array: {"type":"record","name":"Sensor","namespace":"com.example","fields":[{"name":"id","type":"string"},{"name":"internalTemperature","type":"float","default":0.0,"aliases":["temperature"]},{"name":"externalTemperature","type":"float","default":0.0},{"name":"acceleration","type":"float","default":0.0},{"name":"velocity","type":"float","default":0.0},{"name":"accelerometer","type":["null",{"type":"array","items":"float"}]},{"name":"magneticField","type":["null",{"type":"array","items":"float"}]}]}
    at org.apache.avro.Schema.getElementType(Schema.java:289)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:688)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:721)
    ... 49 more

https://github.com/spring-cloud/spring-cloud-stream-samples/blob/2b263af24b0745d14b1fa2c2a969f1e0a2a0bd67/schema-registry-samples/schema-registry-confluent/schema-registry-confluent-producer1/src/main/java/sample/producer1/Producer1Application.java#L53

sobychacko commented 4 years ago

@pedroarthur Thanks for pointing this out. We were using wrong binding names. See this commit where it is fixed.