awspring / spring-cloud-aws

The New Home for Spring Cloud AWS
http://awspring.io
Apache License 2.0
878 stars 299 forks source link

Support multiple object types in the same queue with @SqsListener #886

Open jndietz opened 1 year ago

jndietz commented 1 year ago

Type: Feature

Is your feature request related to a problem? Please describe. I'd like for my @SqsListener methods to perform logic based on an object type.

Describe the solution you'd like

@SqsListener("some-queue")
public void processSomeMessage(SomeMessage mesage) {
     // ...do stuff
}

@SqsListener("some-queue")
public void processAnotherMessage(AnotherMessage mesage) {
     // ...do something else
}

Describe alternatives you've considered I have tried to use ObjectMapper.readValue() to first deserialize, then perform logic using instanceof, however, readValue always succeeds, even if the JSON string doesn't match the target object type.

Additional context Add any other context or screenshots about the feature request here.

dbouclier commented 1 year ago

@jndietz I'm also in trouble with a similar solution, I had deserialization issue and then we cannot use multiple @SqsListener with different types on same queue.

I'm in a publish/subscribe scenario, so I'm sending "event" to SNS Topic then I subscribe an SQS queue, each topics is receiving different type of events like "create", "update"...

image

If you are in the same scenario (I mean using SNS), I fixed the de-serialization issue you have by using the topic attribute "RawMessageDelivery=True" (see: https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html)

Then I realized, if you have two @SqsListener on the same queue, even with different type, the last one will catch all the SQS message.

Today I'm doing the type detection manually (by sending the type in the message header) then I'm doing the de-serialization myself, here my workaround :

@Service
@RequiredArgsConstructor
public class LeaveEventListener {

    private final ObjectMapper objectMapper;

    @SqsListener(queueNames = "Leaves.fifo")
    public void onLeaveEvent(final Message<String> event) throws ClassNotFoundException, JsonProcessingException {
        LOG.debug("Leave event received: {}", event);

        // TODO: find a way to avoid this manual step
        final Event payload = (Event) objectMapper.readValue(event.getPayload(), Class.forName(event.getHeaders().get("payloadType", String.class)));

        switch (payload.getType()) {
            case LEAVE_CREATED -> onLeaveCreatedEvent((LeaveCreatedEvent) payload);
            case LEAVE_UPDATED -> onLeaveUpdateEvent((LeaveUpdatedEvent) payload);
            default -> LOG.warn("Unsupported event received: {}", event.getPayload());
        }
    }

    private void onLeaveCreatedEvent(final LeaveCreatedEvent event) {
        LOG.debug("Leave created event received: {}", event);
    }

    private void onLeaveUpdateEvent(final LeaveUpdatedEvent event) {
        LOG.debug("Leave updated event received: {}", event);
    }

}

it's working but It's not working nicely as what I did in a previous project with @RabbitListener(example: https://engineering.celonis.com/blog/patterns-with-spring-and-rabbitmq-part-1-rabbithandler-and-its-usages/) or with @JmsListner + AzureServiceBus :man_shrugging:

I will be happy to know if there is a better solution wtih AWS SNS/SQS

tomazfernandes commented 1 year ago

@jndietz, if I understand correctly, you want to deserialize the message based on some particular metadata in the message.

You should be able to achieve that by configuring a message converter

See this example for how to use an interface or super type in the listener, and receiving the correct deserialized payload.

Please let me know if that works for you.

Thanks.

dbouclier commented 1 year ago

@tomazfernandes message converters are working well for custom deserialization but how to "route" the message base on the type to different @SqsEventListener using the same queue?

Sorry for hijacking the issue of @jndietz should I create a specific issue for this ?

jndietz commented 1 year ago

No worries at all @dbouclier - I think we're on the same page here. Ideally, I'd like a drop-in replacement for @RabbitHandler. I think I just realized how spring-rabbit does it, it must serialize the class name into the JSON when publishing the message, and then when a consumer/listener picks it up, it has the type already specified.

Thanks for the information @tomazfernandes , I will dig into this and get back to you.

jndietz commented 1 year ago

Alright, I finally got some things "working". Creating and configuring a SqsMessageListenerContainerFactory and a subsequent SqsMessagingMessageConverter work okay. I'd prefer to use the fully qualified class name, but this might work as a stop-gap for a while. However, this has it's own set of problems/bugs that I will post in a separate thread (deserialization fails at random with Jackson errors, and the @SqsListener doesn't always pick up messages from the queue). Anywho...

The docs located here state "_By default, the framework looks for a MessageHeader named Sqs_MA_JavaType containing the fully qualified class name (FQCN) for which the payload should be deserialized to. If such header is found, the message is automatically deserialized to the provided class._". However, I provided the aforementioned header with a fully qualified class name, and it still would go to the wrong @SqsListener method. Given the example in my original post, a value of com.someapp.model.SomeMessage would go to

@SqsListener("some-queue")
                                 // 👇 note the type
public void processAnotherMessage(AnotherMessage mesage) {
     // ...do stuff
}
tomazfernandes commented 1 year ago

Hi @jndietz and @dbouclier

I think what you're looking for is something similar to the @KafkaHandler feature in Spring Kafka.

Is this understanding correct?

If so, we do not currently support a similar feature in Spring Cloud AWS, but I'd be happy to review a PR if you'd like to contribute one.

For now, you're really not supposed to have multiple @SqsListener annotations listening to the same queue and any behavior you see would not be deterministic.

Makes sense? Thanks.

jndietz commented 1 year ago

Yes - that is correct, or similar to how @RabbitHandler works.

I have to say that I'm surprised at your comment that we're not supposed to use multiple @SqsListener in a project. Especially so, since it appears someone went through great lengths to document how you might deserialize to different types and configuring the various classes in spring-cloud-aws documentation.

It sounds like we'll have to work around this issue for now. We can proceed with a single @SqsListener annotation, and within that method we will pull the appropriate value from the Message headers, and then use ObjectMapper to convert to the correct type. I was really hoping there would be Spring way of doing this, but it doesn't sound like there is.

The code here sort of works, but it fails to deserialize correctly at random.

dbouclier commented 1 year ago

@tomazfernandes thanks for your response. this limitation on the @SqsListener should mentioned in the project documentation, since it's an implementation of "spring-messaging" like @RabbitListener, @KafkaListener or @JmsListener I was expecting same features. this drives me in a wrong direction and I was trying to build thing not supported. I guess I should consider using amazon-mq instead of SNS+SQS for a pub/sub scenario.

tomazfernandes commented 1 year ago

It sounds like we'll have to work around this issue for now. We can proceed with a single @SqsListener annotation, and within that method we will pull the appropriate value from the Message headers, and then use ObjectMapper to convert to the correct type. I was really hoping there would be Spring way of doing this, but it doesn't sound like there is.

I think there are a few ways to handle this - for one, you can configure the deserialization to convert to the proper type, then receive a Message<Object> in the @SqsListener method, or an interface / super type. Then based on the type you can call the appropriate service to handle the payload.

I have to say that I'm surprised at your comment that we're not supposed to use multiple @SqsListener in a project.

That's definitely not what I said, and I'm sorry if it sounded harsh. You can use as many listeners in a project as you'd like, just not for the same queue.

I was expecting same features. this drives me in a wrong direction and I was trying to build thing not supported.

I can see your frustration and I'm sorry you spent time going in the wrong direction. Perhaps you'd like to open a PR adding this information to the docs?

I guess I should consider using amazon-mq instead of SNS+SQS for a pub/sub scenario.

I think we've covered some alternatives already, but if you're willing to make architectural changes, perhaps a good approach would be using SNS filters so that only one type of message gets to a queue.

All in all, let's keep in mind this is a community-based project, so feel free to contribute with a PR if you'd like the feature to be included. I'd be happy to offer some guidance and review the PR.

I'll leave the issue open in case anyone else would like to contribute a PR for this.

shehrooz-sattar commented 9 months ago

Yeah, this would have been a very convenient feature.

@jndietz Were you able to figure out why it fails at random?

The code that you have pprovide at

https://github.com/jndietz/sqs-listener/blob/main/src/main/java/com/example/listener/config/MessageConverterConfig.java

tomazfernandes commented 9 months ago

Hey @shehrooz-10p - this is really not supported by the framework at the moment, and for now there's now way for the listener to disambiguate the listener methods by payload type for the same queue.

Spring Kafka on the other hand has such a mechanism - if you'd like to take a look at its implementation and open a PR with the changes I'd be happy to review.