spring-projects / spring-boot

Spring Boot helps you to create Spring-powered, production-grade applications and services with absolute minimum fuss.
https://spring.io/projects/spring-boot
Apache License 2.0
75.18k stars 40.68k forks source link

Configured DefaultJmsListenerContainerFactory for Queue not using applied custom MessageConverter on consume #31565

Closed BigBallard closed 2 years ago

BigBallard commented 2 years ago

Description

I am working on on having two different DefaultJmsListenerContainerFactory's, one for listening to Queue's and the other for Topic's based on an issue that suggested this implementation . The only major difference between the two @Bean methods is that one configures with setPubSubDomain(true) for Topic's and the other setPubSubDomain(false) for Queue's. Otherwise both are configured the same including the use of a custome MessageConverter.

When I start up the application there are two @JmsListener's, one for topic and one for queue. Each respectively assigned the correct container factory based on what type of destination the message's are coming from.

With the topic configuration, the custom MessageConverter is used and the JmsListener's method is called with the converted object. However, when the queue's message is received, the converter is not called and JmsListener method is not called. I have enabled logging for the amq stuff and did notice that the message was being received and it was printing it to console. Watching the Artemis console I can see that the connections are made with the expected consumer and the messages are being pulled from the queue.

During the message converter configuration I ensure that the message ID mappings are made accordingly.

Expected behavior

Both JmsListener methods would be called respectively when receiving a message from the topic/queue and the customer MessageConverter being called.

Actual Bheavior

Only the topic JmsListener is being called with the converted messages. The queue messages are not being forward to the converter.

Code

QueueJmsReceiver

public class QueueJmsReceiver{
  public QueueJmsReceiver(ObjectMapper mapper) {
    super(mapper);
  }

  @JmsListener(destination = "tasks.status", containerFactory = "queue-connection-factory")
  public void receiveMessage(Object message) {
    ...
  }
}

TopicAmqReceiver

public class TopicAmqReceiver {
    public TopicAmqReceiver(ObjectMapper mapper) {
        super(mapper);
    }

    @JmsListener(destination = "tasks.other", containerFactory = "topic-connection-factory")
    public void receivedMessage(final Object message){
       ...
    }
}

JmsConfiguration

...
    @Bean
    @Lazy
    public CustomMessageConverter customMessageConverter(ObjectMapper mapper){
        var converter = new CustomMessageConverter(mapper);
        converter.setTypeIdPropertyName("@type");
        Map<String, Class<?>> idMappings = new HashMap<>();
        idMappings.put(Other.EVENT_ID, Other.class); // "@type": "other"
        idMappings.put(Status.EVENT_ID, Status.class); // "@type": "status"
        converter.setTypeIdMappings(idMappings);
        return converter;
    }

    @Bean("topic-connection-factory")
    @ConditionalOnPropertyNotEmpty(value = {"spring.artemis.broker-url"})
    public DefaultJmsListenerContainerFactory topicConnectionFactory(CustomMessageConverter converter) throws InitializationException {
        LOG.trace("Creating `topic-connection-factory` with message converter '{}'", converter.getClass().getSimpleName());
        // https://github.com/spring-projects/spring-framework/issues/18476
        try {
            var containerFactory = new DefaultJmsListenerContainerFactory();
            containerFactory.setMessageConverter(converter);
            var connectionFactory = getJmsConnectionFactory();
            containerFactory.setConnectionFactory(connectionFactory);
            containerFactory.setPubSubDomain(true);
            connectionFactory.createConnection();
            return containerFactory;
        } catch (Exception e) {
            throw new InitializationException("Failed to load JMS connection factory", e);
        }
    }

    @Bean("queue-connection-factory")
    @ConditionalOnPropertyNotEmpty(value = {"spring.artemis.broker-url"})
    public DefaultJmsListenerContainerFactory queueConnectionFactory(CustomMessageConverter converter) throws InitializationException {
        LOG.trace("Creating `queue-connection-factory` with message converter '{}'", converter.getClass().getSimpleName());
        // https://github.com/spring-projects/spring-framework/issues/18476
        try {
            var containerFactory = new DefaultJmsListenerContainerFactory();
            containerFactory.setMessageConverter(converter);
            var connectionFactory = getJmsConnectionFactory();
            containerFactory.setConnectionFactory(connectionFactory);
            containerFactory.setPubSubDomain(false);
            connectionFactory.createConnection();
            return containerFactory;
        } catch (Exception e) {
            throw new InitializationException("Failed to load JMS connection factory", e);
        }
    }
...

CustomMessageConverter

...
  @Override
  public Message toMessage(Object object, Session session) throws JMSException {
    Objects.requireNonNull(object, "Unable to transform null to message.");
    try {
      var body = objectMapper.writeValueAsString(object);
      return session.createTextMessage(body);
    } catch (JsonProcessingException e) {
      throw new MessageConversionException("Failed to convert object to message.", e);
    }
  }

  @Override
  public Object fromMessage(Message message) throws JMSException {
    if(message instanceof TextMessage){
      try {
        return fromTextMessage((TextMessage) message);
      } catch (JsonProcessingException e) {
        throw new MessageConversionException("Failed to convert message to object.", e);
      }
    }else{
      throw new IllegalArgumentException("Unable to convert message to object: " + message.getClass().getSimpleName() + " is not supported.");
    }
  }
...
philwebb commented 2 years ago

@DallasP9124 Could you please provide a sample application as either a GitHub project or an attached zip file that we can run. We'll also need instructions on how to configure Artemis (or better still a Docker based test).

spring-projects-issues commented 2 years ago

If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.

spring-projects-issues commented 2 years ago

Closing due to lack of requested feedback. If you would like us to look at this issue, please provide the requested information and we will re-open the issue.