softwaremill / elasticmq

In-memory message queue with an Amazon SQS-compatible interface. Runs stand-alone or embedded.
https://softwaremill.com/open-source/
Apache License 2.0
2.51k stars 194 forks source link

Spring Cloud produced messages fail parsing #46

Closed grigorigoldman closed 9 years ago

grigorigoldman commented 9 years ago

I'm trying to use an embedded elasticmq in my Spring Boot/Spring Cloud application but getting exceptions when sending a message:

2015-04-23 22:41:53.312 ERROR 14025 --- [t-dispatcher-13] o.e.r.s.TheSQSRestServerBuilder$$anon$1  : Exception when running routes

java.lang.Exception: Currently only handles String typed attributes
    at org.elasticmq.rest.sqs.SendMessageDirectives$$anonfun$getMessageAttributes$1.apply(SendMessageDirectives.scala:62)
    at org.elasticmq.rest.sqs.SendMessageDirectives$$anonfun$getMessageAttributes$1.apply(SendMessageDirectives.scala:53)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at scala.collection.immutable.Range.foreach(Range.scala:166)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.elasticmq.rest.sqs.SendMessageDirectives$class.getMessageAttributes(SendMessageDirectives.scala:53)
    at org.elasticmq.rest.sqs.TheSQSRestServerBuilder$$anon$1.getMessageAttributes(SQSRestServerBuilder.scala:89)
    at org.elasticmq.rest.sqs.SendMessageDirectives$class.doSendMessage(SendMessageDirectives.scala:72)
    at org.elasticmq.rest.sqs.TheSQSRestServerBuilder$$anon$1.doSendMessage(SQSRestServerBuilder.scala:89)
    at org.elasticmq.rest.sqs.SendMessageBatchDirectives$$anonfun$1$$anonfun$apply$1$$anonfun$2.apply(SendMessageBatchDirectives.scala:16)
    at org.elasticmq.rest.sqs.SendMessageBatchDirectives$$anonfun$1$$anonfun$apply$1$$anonfun$2.apply(SendMessageBatchDirectives.scala:15)
    at org.elasticmq.rest.sqs.BatchRequestsModule$$anonfun$2.apply(BatchRequestsModule.scala:35)
    at org.elasticmq.rest.sqs.BatchRequestsModule$$anonfun$2.apply(BatchRequestsModule.scala:31)
    at scala.collection.immutable.List.map(List.scala:273)
    at org.elasticmq.rest.sqs.BatchRequestsModule$class.batchRequest(BatchRequestsModule.scala:31)
    at org.elasticmq.rest.sqs.TheSQSRestServerBuilder$$anon$1.batchRequest(SQSRestServerBuilder.scala:89)
    at org.elasticmq.rest.sqs.SendMessageBatchDirectives$$anonfun$1$$anonfun$apply$1.apply(SendMessageBatchDirectives.scala:15)
    at org.elasticmq.rest.sqs.SendMessageBatchDirectives$$anonfun$1$$anonfun$apply$1.apply(SendMessageBatchDirectives.scala:12)
    at org.elasticmq.rest.sqs.directives.AnyParamDirectives2$$anonfun$anyParamsMap$1$$anonfun$apply$1.apply(AnyParamDirectives2.scala:13)
    at org.elasticmq.rest.sqs.directives.AnyParamDirectives2$$anonfun$anyParamsMap$1$$anonfun$apply$1.apply(AnyParamDirectives2.scala:11)
    at spray.routing.ApplyConverterInstances$$anon$22$$anonfun$apply$1.apply(ApplyConverterInstances.scala:25)
    at spray.routing.ApplyConverterInstances$$anon$22$$anonfun$apply$1.apply(ApplyConverterInstances.scala:24)
    at spray.routing.ConjunctionMagnet$$anon$1$$anon$2$$anonfun$happly$1$$anonfun$apply$1.apply(Directive.scala:38)
    at spray.routing.ConjunctionMagnet$$anon$1$$anon$2$$anonfun$happly$1$$anonfun$apply$1.apply(Directive.scala:37)
    at spray.routing.directives.BasicDirectives$$anon$1.happly(BasicDirectives.scala:26)
    at spray.routing.ConjunctionMagnet$$anon$1$$anon$2$$anonfun$happly$1.apply(Directive.scala:37)
    at spray.routing.ConjunctionMagnet$$anon$1$$anon$2$$anonfun$happly$1.apply(Directive.scala:36)
    at spray.routing.directives.BasicDirectives$$anon$2.happly(BasicDirectives.scala:79)
    at spray.routing.Directive$$anon$7$$anonfun$happly$4.apply(Directive.scala:86)
    at spray.routing.Directive$$anon$7$$anonfun$happly$4.apply(Directive.scala:86)
    at spray.routing.directives.BasicDirectives$$anon$3$$anonfun$happly$1.apply(BasicDirectives.scala:92)
    at spray.routing.directives.BasicDirectives$$anon$3$$anonfun$happly$1.apply(BasicDirectives.scala:92)
    at spray.routing.directives.BasicDirectives$$anon$3$$anonfun$happly$1.apply(BasicDirectives.scala:92)
    at spray.routing.directives.BasicDirectives$$anon$3$$anonfun$happly$1.apply(BasicDirectives.scala:92)
    at spray.routing.directives.ExecutionDirectives$$anonfun$handleExceptions$1$$anonfun$apply$4.apply(ExecutionDirectives.scala:35)
    at spray.routing.directives.ExecutionDirectives$$anonfun$handleExceptions$1$$anonfun$apply$4.apply(ExecutionDirectives.scala:33)
    at org.elasticmq.rest.sqs.directives.FutureDirectives$$anonfun$futureRouteToRoute$1$$anonfun$apply$1.apply(FutureDirectives.scala:14)
    at org.elasticmq.rest.sqs.directives.FutureDirectives$$anonfun$futureRouteToRoute$1$$anonfun$apply$1.apply(FutureDirectives.scala:11)
    at scala.util.Success$$anonfun$map$1.apply(Try.scala:236)
    at scala.util.Try$.apply(Try.scala:191)
    at scala.util.Success.map(Try.scala:236)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[Fatal Error] :1:1: Content is not allowed in prolog.
[Fatal Error] :1:1: Content is not allowed in prolog.

I originally thought that it was Spring incorrectly creating the message, which elasticMq was then failing to process, however, when running the same code against a real SQS service, both sending and receiving messages works.

My project code looks like this:

@SpringBootApplication
@Import(MessagingConfig.class)
public class App {
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}
@Configuration
public class MessagingConfig implements InitializingBean, DisposableBean {

    private SQSRestServer sqsRestServer;

    @Value("${elasticmq.enabled:false}")
    private boolean elasticMqEnabled;

    @Autowired
    private AmazonSQSAsync client;

    @Bean
    public QueueMessagingTemplate queueMessagingTemplate(AmazonSQS amazonSqs, ResourceIdResolver resourceIdResolver) {
        return new QueueMessagingTemplate(amazonSqs, resourceIdResolver);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        if (elasticMqEnabled) {
            sqsRestServer = SQSRestServerBuilder.start();
            sqsRestServer.waitUntilStarted();

            client.setEndpoint("http://localhost:9324");
        }
        client.createQueue("TestQueue");
    }

    @Override
    public void destroy() throws Exception {
        String queueUrl = client.getQueueUrl("TestQueue").getQueueUrl();
        client.deleteQueue(queueUrl);

        if (elasticMqEnabled) {
            sqsRestServer.stopAndWait();
        }
    }
}
@Component
public class Sender {

    @Autowired
    private QueueMessagingTemplate queueMessagingTemplate;

    public void send(MyMessage message) {
        queueMessagingTemplate.convertAndSend("TestQueue", message);
    }
}
@Component
public class GreetingEventReceiver {
    @MessageMapping("TestQueue")
    public void receiveGreetingEvent(Greeting greeting) {
        System.out.println(greeting.getMessage());
    }
}
adamw commented 9 years ago

Are you setting any non-String attributes on the message? (I suspect that might happen in queueMessagingTemplate.convertAndSend) Currently only String-typed attributes are supported

sanjaybhatol commented 9 years ago

Hi Adam, I am passing JSON message and facing the same issue. Is there any plan in near future to provide support for JSON type ?

sf-git commented 9 years ago

I had the same issue. Spring Cloud uses spring-messaging which automatically adds UUID (string) and timestamp(long) to a message's headers. And those headers will be transformed into SQS message attributes. Timestamp causes this issue. As a workaround you can override doSend method in QueueMessagingTemplate and remove timestamp header from the message.

adamw commented 9 years ago

I think adding support for all message attribute types shouldn't be too hard, if somebody would like to attempt a PR :)

sf-git commented 9 years ago

It wasn't ) I am currently working on it. Just need to write some more tests for SQSStrict mode to check that number attribute's value is inside -10^128 .. 10^126. So expect PR really soon )

adamw commented 9 years ago

Awesome! :)

adamw commented 9 years ago

Merged & released 0.8.10 - thanks!