softwaremill / elasticmq

In-memory message queue with an Amazon SQS-compatible interface. Runs stand-alone or embedded.
https://softwaremill.com/open-source/
Apache License 2.0
2.51k stars 194 forks source link

Support for long polling appears broken #78

Closed SmithKevin closed 7 years ago

SmithKevin commented 7 years ago

Hello. My server is using a the SQS long polling mechanism by using the following

ReceiveMessageRequest rmr = new ReceiveMessageRequest()
                            .withMaxNumberOfMessages(10)
                            .withWaitTimeSeconds(20)
                            .withQueueUrl(scanQueueUrl);
                    ReceiveMessageResult rx = sqs.receiveMessage(rmr);

However when doing this elesticmq throws a timeout exception instead of just returning no messages. The error is as follows

java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:209)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:139)
    at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:155)
    at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:284)
    at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
    at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
    at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
    at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:165)
    at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:167)
    at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
    at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:82)
    at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
    at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:271)
    at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
    at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
    at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
    at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:880)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:723)
    at com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:475)
    at com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:437)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:386)
    at com.amazonaws.services.sqs.AmazonSQSClient.doInvoke(AmazonSQSClient.java:1749)
    at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:1719)
    at com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1369)

[elasticmq-akka.actor.default-dispatcher-14] ERROR org.elasticmq.rest.sqs.TheSQSRestServerBuilder$$anon$1 - Exception when running routes
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://elasticmq/user/$a/$a#-794922759]] after [21000 ms]
    at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:334)
    at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.run(Scheduler.scala:476)
    at akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(Scheduler.scala:282)
    at akka.actor.LightArrayRevolverScheduler$$anonfun$close$1.apply(Scheduler.scala:281)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at akka.actor.LightArrayRevolverScheduler.close(Scheduler.scala:280)
    at akka.actor.ActorSystemImpl.stopScheduler(ActorSystem.scala:689)
    at akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply$mcV$sp(ActorSystem.scala:618)
    at akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply(ActorSystem.scala:618)
    at akka.actor.ActorSystemImpl$$anonfun$liftedTree2$1$1.apply(ActorSystem.scala:618)
    at akka.actor.ActorSystemImpl$$anon$3.run(ActorSystem.scala:642)
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.runNext$1(ActorSystem.scala:809)
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply$mcV$sp(ActorSystem.scala:812)
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:805)
    at akka.actor.ActorSystemImpl$TerminationCallbacks$$anonfun$run$1.apply(ActorSystem.scala:805)
    at akka.util.ReentrantGuard.withGuard(LockUtil.scala:15)
    at akka.actor.ActorSystemImpl$TerminationCallbacks.run(ActorSystem.scala:805)
    at akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:639)
    at akka.actor.ActorSystemImpl$$anonfun$terminationCallbacks$1.apply(ActorSystem.scala:639)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Notice that the time seconds in the exception is 20 + 1 extra second. if i remove the .withWaitTimeSeconds(20) then everything works as expected. Is long polling not supported or is it a bug? Thanks

SmithKevin commented 7 years ago

After looking into this further. This issue is 100% my fault. What was happening is that I was stopping elasticmq while a long polling request was in progress. So since the queue was stopped from underneath it, the polling request threw the exception.

I'm not sure what the proper behavior should be here. Possibly the server waiting for all requests to finish before shutting down. Either way it is easily remedied on my side so its not that important.

Sorry for the bug