spring-attic / spring-cloud-aws

All development has moved to https://github.com/awspring/spring-cloud-aws Integration for Amazon Web Services APIs with Spring
https://awspring.io/
Apache License 2.0
590 stars 375 forks source link

Resolving native SQS message. #612

Closed maciejwalkowiak closed 4 years ago

maciejwalkowiak commented 4 years ago

Draft for now as I am not convinced if this is a right thing to do. Tests, docs etc are missing, but the implementation itself is working.

Fixes gh-295

maciejwalkowiak commented 4 years ago

@artembilan @markfisher could you please give us advice here? What would be the right approach to keep the original SQS message for further processing:

  1. Extend GenericMessage:
public class SqsMessage extends GenericMessage<String> {

    private final Message originalMessage;

    public SqsMessage(String payload, MessageHeaders headers, Message originalMessage) {
        super(payload, headers);
        this.originalMessage = originalMessage;
    }

    public Message getOriginalMessage() {
        return originalMessage;
    }
}
  1. Process GenericMessage with com.amazonaws.services.sqs.model.Message in payload field.

Both approaches work for what we are trying to achieve here - add ability to resolve original SQS message in @SqsListener annotated method parameters, but I am not sure if I am not missing something from bigger Spring Messaging picture.

artembilan commented 4 years ago

Well, in other frameworks we go a different direction. We have there some MessageListener abstraction which is specific to the broker. In Spring Kafka it is like this: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/GenericMessageListener.java Where data comes from here: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListener.java, which in your case could be that com.amazonaws.services.sqs.model.Message.

In Spring AMQP it is like this: https://github.com/spring-projects/spring-amqp/blob/master/spring-amqp/src/main/java/org/springframework/amqp/core/MessageListener.java

I do understand that logic here is made based on the MessageHandler.handleMessage(Message<?>) so it is too late to change it radically. And since you are tied with the Spring Messaging contract, I only can suggest to store that com.amazonaws.services.sqs.model.Message in the header. In Spring Integration we deal with a header like this IntegrationMessageHeaderAccessor.SOURCE_DATA and populate it as an org.springframework.amqp.core.Message in the AMQP Inbound Channel Adapters and as a ConsumerRecord in Kafka Inbound Channel Adapters.

So, when you work with this @SqsListener with an com.amazonaws.services.sqs.model.Message as one of the argument you could get its value from that header and be compatible with possible Spring Integration downstream.

Does it make sense?

P.S. Honestly I'm fully against any new GenericMessage extensions: it is hard to support all of them in other places, e.g. (de)serialiation.

maciejwalkowiak commented 4 years ago

Thanks @artembilan! It's very useful. We will go with header for now and in 3.0 we will revisit it as we are going to revisit SQS integration internals anyway.

maciejwalkowiak commented 4 years ago

Original message is now passed through sourceData header.