softwaremill / elasticmq

In-memory message queue with an Amazon SQS-compatible interface. Runs stand-alone or embedded.
https://softwaremill.com/open-source/
Apache License 2.0
2.55k stars 196 forks source link

SQS Fifo Support #125

Closed JoaoRXFernandesJG closed 2 years ago

JoaoRXFernandesJG commented 6 years ago

Hi,

Any chance we could update to allow FifoQueue attributes?

aws --endpoint-url=http://localhost:4576 sqs create-queue --queue-name "test-queue.fifo" --attributes "FifoQueue=true,ContentBasedDeduplication=true"

An error occurred (InvalidParameterValue) when calling the CreateQueue operation: InvalidParameterValue; see the SQS docs.

Thanks,

Joao

simong commented 6 years ago

FWIW, I'm working on this and I'm hoping to submit a PR for it in the next week or so. I have basic support for FIFO queues going in the fifo-support branch in my repo (https://github.com/simong/elasticmq/tree/fifo-support).

adamw commented 6 years ago

@simong 👍

villanuevawill commented 6 years ago

@simong any update on this? Are you still working on this?

simong commented 6 years ago

@villanuevawill I've started work on this in my fifo-support branch which should be at a reasonable point. Not all of the FIFO features have been implemented, but most of it should be there:

That said, work has stalled a little bit as I'm fighting other fires at the moment. Once those are put out I'm planning to get back to this. I'm hoping that would be somewhere towards the middle/end of this month.

villanuevawill commented 6 years ago

@simong thanks so much for doing this. I really appreciate it. I'll use your branch for now. Again - thanks for contributing here and helping out. :+1:

simong commented 6 years ago

@villanuevawill, have you had a chance to test drive that branch a bit? I think it's at a point now where I'll need to actively run our own expectations against it to catch the real edge-cases/gotchas but that has been slightly deprioritized and pushed out sadly. If you have any feedback in the interim that I could address, that would really move this work forward a bit faster.

hsyed commented 6 years ago

I'm very interested in the FIFO work. Using the new branch is quite a bit of hoop jumping as we are using bazel . I'm thinking of vendoring the branch in our mono repo and setting up a bazel build for the code.

I am writing a kotlin coroutine based queue reactor (a simple JMS clone that doesn't tie up 500 threads). I already have the framework working with standard queues. @simong do you feel the work is at the point where I would be able to use it for writing basic tests for FIFO queues ?

simong commented 6 years ago

@hsyed: Yes, I think the branch is ready to be used and to start collecting feedback on. I've been swamped under with other work, but I'm hopeful I can get back to it in a few weeks. Any issues that you could identify would be very helpful!

Note that it might not merge cleanly with master since last week. I'll try and get it merged up tonight.

hsyed commented 6 years ago

@simong ok have everything everything wired up in my testbench.


First, It is likely I am doing something wrong but MessageGroupId, and MessageDeduplicationId are missing from themessageAttributes for a ReceiveMessage request (along with other fifo attributes) -- they aren't set on the message anywhere else either.

The request is created with withAttributeNames("All") and withMessageAttributeNames("All") and attributes that are part of regular queues are showing up --e.g., SenderId.


Second, When a request is invalid, it would be useful to get some indicator of what is wrong it. It took me a while to locate the code paths invalidating the requests. The cases MessageGroupId absent and MessageDeduplicationId absent with ContentBasedDeduplication not enabled.

It should be possible to send a one liner so that it surfaces in the AmazonServiceException I think. A log entry could work as well. A message in the exception would be preferable in testing scenarios.

This applies to the standard queues as well which also throw a generic AmazonServiceException for invalid requests. It's much clearer when the request is invalid for standard queues.

simong commented 6 years ago

@hsyed Regarding the first issue, good catch! I've pushed some follow-up to my branch and those attributes are now returned. I've also added support for request attempt ids.

Regarding your second point, I agree that it's not very clear, but it follows the established pattern for data validation. I probably already did a bit too much refactoring in this branch and didn't want to increase the review burden. I'm happy to do a follow-up PR with extra validation logic though (or in this branch if @adamw is OK with it).

Have you run into any other issues with it?

simong commented 6 years ago

I've pushed some more follow-up that allows you to configure a FIFO queue from the configuration files.

romdsj commented 6 years ago

Thanks @simong for the job. I'm in a hurry to use this feature !

simong commented 6 years ago

This is now available as of 0.14.1! 🎉

JoaoRXFernandes commented 6 years ago

🎉

mberlanda commented 6 years ago

I guess there are still a few parameters missing.

I tried to create with terraform a new queue using the following main.tf config:

provider "aws" {
  region                      = "us-east-1"
  access_key                  = "anaccesskey"
  secret_key                  = "asecretkey"
  skip_credentials_validation = true
  skip_metadata_api_check     = true
  s3_force_path_style         = true

  endpoints {
    sqs      = "http://localhost:9324"
  }
}

resource "aws_sqs_queue" "terraform_queue_mq" {
  name                        = "terraform-queue-mq.fifo"
  fifo_queue                  = true
  content_based_deduplication = true
}

When applying this plan I got:

terraform apply -input=false -auto-approve
aws_sqs_queue.terraform_queue_mq: Creating...
  arn:                               "" => "<computed>"
  content_based_deduplication:       "" => "true"
  delay_seconds:                     "" => "0"
  fifo_queue:                        "" => "true"
  kms_data_key_reuse_period_seconds: "" => "<computed>"
  max_message_size:                  "" => "262144"
  message_retention_seconds:         "" => "345600"
  name:                              "" => "terraform-queue-mq.fifo"
  policy:                            "" => "<computed>"
  receive_wait_time_seconds:         "" => "0"
  visibility_timeout_seconds:        "" => "30"

Error: Error applying plan:

1 error(s) occurred:

* aws_sqs_queue.terraform_queue_mq: 1 error(s) occurred:

* aws_sqs_queue.terraform_queue_mq: [ERR] Error updating SQS attributes: InvalidAttributeName: InvalidAttributeName; see the SQS docs.
    status code: 400, request id: 00000000-0000-0000-0000-000000000000

Terraform does not automatically rollback in the face of errors.
Instead, your Terraform state file has been partially updated with
any resources that successfully completed. Please address the error
above and apply again to incrementally change your infrastructure.

My ElasticMQ logs are:

10:27:07.714 [elasticmq-akka.actor.default-dispatcher-14] INFO  o.elasticmq.actor.QueueManagerActor - Creating queue QueueData(terraform-queue-mq.fifo,MillisVisibilityTimeout(30000),PT0S,PT0S,2018-08-06T10:27:07.714Z,2018-08-06T10:27:07.714Z,None,true,true)
10:27:07.726 [elasticmq-akka.actor.default-dispatcher-11] WARN  o.e.r.s.TheSQSRestServerBuilder$$anon$1 - Ignored attribute "MessageRetentionPeriod" (supported by SQS but not ElasticMQ)
10:27:07.727 [elasticmq-akka.actor.default-dispatcher-11] WARN  o.e.r.s.TheSQSRestServerBuilder$$anon$1 - Ignored attribute "MaximumMessageSize" (supported by SQS but not ElasticMQ)
10:27:07.726 [elasticmq-akka.actor.default-dispatcher-15] INFO  org.elasticmq.actor.queue.QueueActor - terraform-queue-mq.fifo: Updating default visibility timeout to MillisVisibilityTimeout(30000)
jeffbernst commented 2 years ago

First of all, thanks for adding this feature! It makes life much easier for us 🙏

Just wanted to call out that it would be nice to log an error or something to clarify the necessary settings for a FIFO queue. When I set this up originally I had a queue with .fifo at the end of the name in our elasticmq.conf file and I could see in docker that the queues weren't being created, but there was no guidance in there about how fifo = true needs to be specified and the queue name can't have .fifo at the end.

LarsFronius commented 2 years ago

Hi @jeffbernst - perhaps I am late, but if you quote the queue name it appears elasticmq will handle the .fifo suffix as part of the queue name again:

"my_queue_name.fifo" {
    defaultVisibilityTimeout = 10 seconds
    delay = 5 seconds
    receiveMessageWait = 0 seconds
    fifo = true
    contentBasedDeduplication = false
}
jeffbernst commented 2 years ago

@LarsFronius Oh nice, thanks for the heads up!