smallrye / smallrye-reactive-messaging

SmallRye Reactive Messaging
http://www.smallrye.io/smallrye-reactive-messaging/
Apache License 2.0
241 stars 179 forks source link

Add support for AWS SQS #1117

Closed arvidvillen closed 7 months ago

arvidvillen commented 3 years ago

Add support for AWS SQS: https://aws.amazon.com/sqs/

tcerda95 commented 3 years ago

@cescoffier

I would love to collaborate with this implementation but I have a few theoretical doubts when it comes to backpressure and implementing the incoming connector.

First a high level overview of how SQS works. SQS offers a long polling mechanism (max 20 secs), can return up to a maximum of 10 messages per poll/batch and AWS offers an async SDK. So a very simple implementation for a PublisherBuilder I came up with is to poll indefefinitely and create a multi with each of the items per batch. Note that the async SQS client from the SDK returns a completion stage:

    public PublisherBuilder<? extends Message<?>> getSource() {
        Multi<Message<?>> publisher = Uni.createFrom()
            .completionStage(() -> sqsClient.receiveMessage(m -> m.queueUrl(url)))
            .repeat().indefinitely()
            .invoke(() -> System.out.println("Log SQS invocation"))
            .onItem().transformToIterable(ReceiveMessageResponse::messages)
            .onItem().transform(Message::of);

        return ReactiveStreams.fromPublisher(publisher);
    }

However, given that this implementation would poll indefinitely, would not this defeat the purpose of backpressure? Or will this only poll as long as downstream requests for more items?

Finally, thank you very much for your time.

cescoffier commented 3 years ago

I don't now SQS, but Mutiny will only retry when there is request. So, I believe the back pressure will be managed.

MihaiBogdanEugen commented 2 years ago

@tcerdaITBA I'd like to contribute to this as well. Have you made any progress on your side?

soulseekeer24 commented 1 year ago

hello, anyone still interested on working on this?

damiankaplon commented 1 year ago
  1. Is there any way for now to use smallrye-reactive-messaging with SQS? As far as i know it does not support amazon SQS for now. Am i right?
  2. Is it possible to wrap SQS Client using JMS so i can integrate smallrye-reactive-messaging via JMS Connector?
MihaiBogdanEugen commented 1 year ago
  1. You are right.

  2. No.

soulseekeer24 commented 1 year ago

anyone interested on working on adding SQS support ?XD

adampoplawski commented 1 year ago

Hello Is there a change for this support? Maybe I can contribute somehow?

cescoffier commented 1 year ago

Sure, a contribution would be more than welcome.

soulseekeer24 commented 1 year ago

i would like to help

adampoplawski commented 1 year ago

@tcerdaITBA are you still interested? It seems you did some work towards and now it seems there are ppl willing to support.

holomekc commented 1 year ago

Hi I started with an implementation: https://github.com/holomekc/smallrye-reactive-messaging/commits/feature/AWS

I think I have no issues with the actual implementation. But there are some concepts which confuses me a little bit. So maybe you can help me to bring it into the right direction. But first the progress.

Progress: Moved to the pr: #2402

holomekc commented 1 year ago

Questions:

  1. Project structure I created an AWS module and a nested SQS module. Should I flatten the structure? I like the nested approach. What do you think
  2. OpenTelemetry TracingUtils starts and ends the trace immediately. Does not that violate the OpenTelemetry documentation?:

    • Call shouldStart(Context, Object) and do not proceed if it returns false.
    • Call start(Context, Object) at the beginning of a request.
    • Call end(Context, Object, Object, Throwable) at the end of a request.

    In SQS when I send a message I get the messageId only in the response of the SDK lib. I cannot add this information to the span. Why isn't AsyncOperationEndStrategy used? For batching it is different. There I need to generate the ids before sending. But from an implementation perspective it would be easier to do that after the response as well.

  3. Configuration For CreateQueue I want to provide the option to define attributes (Map<String, String) and tags (Map<String, String>). In case of sending messages I could use the metadata to provide this information. But this does not work for the incoming channel. The only way I found is to use the connector configuration. But Maps or Lists are not really supported. I need to parse from String manually (key1:value1,key2:value2 or value1,value2). This is something I cannot do inside of the message streams over and over again. It is a bit strange, because in microprofile config you could define maps. Quarkus also does this all the time.

    In general I think it depends on the use-case. I think for creation it is fine, because you most likely do not create that many queues. But what is the proper way to achieve something like this. I saw that sometimes beans are used to provide configuration. This is a bit strange to me

  4. Graceful Shutdown I started with a terminate method and @BeforeDestroy. I saw that in the other implementations. But this is not really graceful. I cannot simply destroy the SDK client and the channels. I need to count some things. E.g. in case 10 messages are received I need to wait for the processing and I need to wait for the deletion/confirmation. I did this already in the past in a Quarkus lib. There the approach was more clear to me. Am I allowed to block the terminate method? It could take some time until the processing is done

Thank you for some help.

adampoplawski commented 1 year ago

@cescoffier can we get support with code review? It seems more people want to contribute.

cescoffier commented 1 year ago

Sure, @ozangunalp abd I would be happy to review the code

adampoplawski commented 1 year ago

Hello @cescoffier @ozangunalp . Sorry to ping again but maybe review was forgotten by accident :)

cescoffier commented 1 year ago

@adampoplawski where is the PR?

adampoplawski commented 1 year ago

Hello @cescoffier There is branch plus questions from @holomekc how to proceed, sorry for miss information. https://github.com/holomekc/smallrye-reactive-messaging/commits/feature/AWS

holomekc commented 1 year ago

I did not create a pr yet, because I did not write tests etc. yet. As @adampoplawski mentioned I wrote some questions. I needed to chill a little bit. I was working to much. I will try to finish what I started asap. I have some freetime soon-ish

ozangunalp commented 12 months ago

Thanks for this. There is now a connector contribution guide if it may help you: https://smallrye.io/smallrye-reactive-messaging/4.11.0/concepts/contributing-connectors/

holomekc commented 11 months ago

I created the pr. It is not done yet, but maybe review, help etc. is easier to achieve:

2402

spc16670 commented 10 months ago

Are we planning to merge it anytime soon? Eagerly awaiting this....

holomekc commented 10 months ago

Nope. Feel free to help. There is still much to do.

adampoplawski commented 7 months ago

@cescoffier @ozangunalp There is new PR.