I am testing a simple Spring Integration flow with SQS. In this test, we create mock SQS queues in Docker with LocalStack, send a message to one queue, let the flow send the message to the second queue, and then pick up the message from there. I am receiving an error when the test tries to retrieve the message from the second queue.
[2024-12-02 15:43:51.206] - 27528 ERROR [sdk-async-response-1-9] --- i.a.c.s.l.s.AbstractMessageConvertingMessageSource: Error converting message Message(MessageId=a180ab3b-2278-409c-a42c-0ef150fae893, ReceiptHandle=Y2NjM2U2MzktN2MzOS00YjQ2LWExZDAtY2FlNDdkZTJlNzNmIGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6c3Rvcm14YS1yZXN1bHQtcXVldWUgYTE4MGFiM2ItMjI3OC00MDljLWE0MmMtMGVmMTUwZmFlODkzIDE3MzMxNTA2MzEuMTU2MjEyMw==, MD5OfBody=0f5fd36ebc7671562a7b91e47681cc8c, Body={"payload":"error: this is an error","headers":{"replyChannel":"nullChannel","errorChannel":"","id":"0f4fb9c9-c017-a0cb-89a1-61c9d9c6d4fd","timestamp":1733150631035}}, Attributes={ApproximateReceiveCount=1, SentTimestamp=1733150631155, SenderId=000000000000, ApproximateFirstReceiveTimestamp=1733150631156}, MD5OfMessageAttributes=3b4c2e873b0438e910219a49e49df6bc, MessageAttributes={JavaType=MessageAttributeValue(StringValue=org.springframework.messaging.support.GenericMessage, DataType=String), contentType=MessageAttributeValue(StringValue=application/json, DataType=String)}), ignoring.
org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot construct instance of `org.springframework.messaging.support.GenericMessage` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 2]
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:256)
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:183)
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:174)
at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:57)
at io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter.convertPayload(AbstractMessagingMessageConverter.java:182)
at io.awspring.cloud.sqs.support.converter.AbstractMessagingMessageConverter.toMessagingMessage(AbstractMessagingMessageConverter.java:163)
at io.awspring.cloud.sqs.listener.source.AbstractMessageConvertingMessageSource.convertMessage(AbstractMessageConvertingMessageSource.java:99)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at io.awspring.cloud.sqs.listener.source.AbstractMessageConvertingMessageSource.convertMessages(AbstractMessageConvertingMessageSource.java:90)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:58)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:69)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:177)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$execute$0(MakeAsyncHttpRequestStage.java:110)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.completeResponseFuture(MakeAsyncHttpRequestStage.java:253)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:167)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:911)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.springframework.messaging.support.GenericMessage` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
at [Source: REDACTED (`StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION` disabled); line: 1, column: 2]
at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67)
at com.fasterxml.jackson.databind.DeserializationContext.reportBadDefinition(DeserializationContext.java:1887)
at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:414)
at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1375)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1508)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:348)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:185)
at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:342)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4905)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3848)
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:251)
... 45 common frames omitted
Sample
The main application is as follows:
@SpringBootApplication
public class SampleIntegrationApplication {
@MessagingGateway(name = "QueueGateway")
public interface QueueGateway {
@Gateway(requestChannel = "mainChannel")
void sendToQueue(String message);
}
@Autowired
QueueGateway gateway;
@Autowired
SqsTemplate template;
@SqsListener("input-queue")
public void listenTestQueue(String message) {
gateway.sendToQueue(message);
}
public static void main(String[] args) throws Exception {
SpringApplication.run(SampleIntegrationApplication.class, args);
}
@Bean
public IntegrationFlow sqsFlow() {
return IntegrationFlow
.from("mainChannel")
.filter((String message) -> message.toLowerCase().contains("error"))
.log(LoggingHandler.Level.WARN, "routing", m -> m)
.handle(m -> template.send(to -> to.queue("output-queue").payload(m)))
.get();
}
}
In the test, the messages are sent to the queue as follows:
sqsTemplate.send(to -> to.queue("input-queue").payload("error: this is an error"));
sqsTemplate.send(to -> to.queue("input-queue").payload("This is okay"));
In other discussions, I saw that the use of DevTools may have something to do with this, but the above application does not have DevTools as a dependency.
Type: Bug
Component: SQS
Describe the bug Spring Cloud AWS version: 3.2.0
I am testing a simple Spring Integration flow with SQS. In this test, we create mock SQS queues in Docker with LocalStack, send a message to one queue, let the flow send the message to the second queue, and then pick up the message from there. I am receiving an error when the test tries to retrieve the message from the second queue.
Sample The main application is as follows:
In the test, the messages are sent to the queue as follows:
And the test includes the following listener:
In other discussions, I saw that the use of DevTools may have something to do with this, but the above application does not have DevTools as a dependency.