softwaremill / elasticmq

In-memory message queue with an Amazon SQS-compatible interface. Runs stand-alone or embedded.
https://softwaremill.com/open-source/
Apache License 2.0
2.52k stars 193 forks source link

Long Polling Requests Fail on FIFO Queues #151

Closed chrislbs closed 5 years ago

chrislbs commented 6 years ago

We're using elasticmq to mock the SQS service for our java application. We're using the Java JMS AWS SDK which uses long polling 'WaitTimeSeconds=20 ' to prefetch messages for processing. We noticed during our tests that the time from message sent -> message received that it would take upwards of 15 seconds (or sometimes not at all) for us to get an answer on FIFO queues but standard queues always seemed pretty snappy e.g. < 1 second. I'm not familiar enough with Scala or Akka to really tackle this issue and understand what's causing the problem but I wrote a test that at least simulates the behavior I'm seeing.

// THIS TEST FAILS
test("FIFO queues should respond during long polling") {
    // Given
    val createRequest = new CreateQueueRequest(s"testFifoQueue-long-poll.fifo")
      .addAttributesEntry("FifoQueue", "true")
      .addAttributesEntry("ContentBasedDeduplication", "false")

    val queueUrl = client.createQueue(createRequest).getQueueUrl

    val messageRequest = new SendMessageRequest(queueUrl, "Message 1")
      .withMessageGroupId("group1")
      .withMessageDeduplicationId(UUID.randomUUID().toString)

    val longPollRequest = new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(20);

    var t = new Thread() {
      override def run(): Unit = {
        Thread.sleep(3000L)
        client.sendMessage(messageRequest)
      }
    }
    t.start()

    // When
    var start = System.currentTimeMillis()
    var messages = client.receiveMessage(longPollRequest).getMessages
    var end = System.currentTimeMillis()

    // Then
    (end - start) should be >= 3000L
    (end - start) should be <= 6000L
    messages.size should be(1)

    // run against with same content and group id but different deduplication id
    messageRequest.withMessageDeduplicationId(UUID.randomUUID().toString)

    t = new Thread() {
      override def run(): Unit = {
        Thread.sleep(3000L)
        client.sendMessage(messageRequest)
      }
    }
    t.start()

    // When
    start = System.currentTimeMillis()
    messages = client.receiveMessage(longPollRequest).getMessages
    end = System.currentTimeMillis()

    // Then
    (end - start) should be >= 3000L
    (end - start) should be <= 6000L
    messages.size should be(1)
  }

// THIS TEST PASSES
test("Standard queues has no issue during long polling") {
    // Given

    val createRequest = new CreateQueueRequest("standard-queue-long")
    val queueUrl = client.createQueue(createRequest).getQueueUrl

    val messageRequest = new SendMessageRequest(queueUrl, "Message 1")

    val longPollRequest = new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(20)

    var t = new Thread() {
      override def run(): Unit = {
        Thread.sleep(3000L)
        client.sendMessage(messageRequest)
      }
    }
    t.start()

    // When
    var start = System.currentTimeMillis()
    var messages = client.receiveMessage(longPollRequest).getMessages
    var end = System.currentTimeMillis()

    // Then
    (end - start) should be >= 3000L
    (end - start) should be <= 6000L
    messages.size should be(1)

    t = new Thread() {
      override def run(): Unit = {
        Thread.sleep(3000L)
        client.sendMessage(messageRequest)
      }
    }
    t.start()

    // When
    start = System.currentTimeMillis()
    messages = client.receiveMessage(longPollRequest).getMessages
    end = System.currentTimeMillis()

    // Then
    (end - start) should be >= 3000L
    (end - start) should be <= 6000L
    messages.size should be(1)
  }
chrislbs commented 6 years ago

Going to cc @simong as he did most of the original work.

chrislbs commented 5 years ago

The test above didn't properly address the issue. It turns out the issue was actually with the behavior of specifying a requestAttemptId with a FIFO queue. I've opend pull request #155 that addresses the issue. I'm not sure if it is the best idiomatic implementation as I don't really know Scala but it was a relatively simple fix once I understood how everything plugged together.

chrislbs commented 5 years ago

Closing as #155 resolved the issue and was merged.