spring-projects / spring-amqp

Spring AMQP - support for Spring programming model with AMQP, especially but not limited to RabbitMQ
https://spring.io/projects/spring-amqp
Apache License 2.0
795 stars 615 forks source link

Admininistration functionality for [Simple|Direct]MessageListenerContainer via jmx #1521

Open garyrussell opened 1 year ago

garyrussell commented 1 year ago

Discussed in https://github.com/spring-projects/spring-amqp/discussions/1520

Originally posted by **rfelgent** October 18, 2022 Hi @all, at my company we have created code that exposes basic admininistration functionality for `[Simple|Direct]MessageListenerContainer`: the so called `MessageListenerContainerAdmin`. The current feature set is compromised of: - start|stop of listener containers for queue x - setting concurrency like maxConsumer, minConsumer for queue x - setting batch like batchSize, receiveTimeout for queue x The idea of a `MessageListenerContainerAdmin` is highly inspired by `AmqpAdmin`. My company would like to contribute such code to the core spring-amqp framework. If you people (especially @garyrussell as lead developer of spring-amqp) are interested in this feature, I could make a PR.
rfelgent commented 1 year ago

Hi @garyrussell ,

I would like to get some guidance how and where to start.

Here ist the (not cleansed) code to serve as a basis for a discussion:

@Component
@ConditionalOnProperty(prefix = "app.amqp", name = "enable-listener", havingValue = "true")
@ManagedResource(description = "Administration functionality for SimpleMessageListenerContainer")
@RequiredArgsConstructor
@Slf4j
public class SimpleMessageListenerContainerAdmin implements ContainerCustomizer<SimpleMessageListenerContainer> {

  private final RabbitMqProperties rabbitMqProperties;
  // invocation of ContainerCustomizer#configure() happens before endpoint registry bean is available => some kind of "lazy" mode is required
  private final ObjectProvider<RabbitListenerEndpointRegistry> rabbitListenerEndpointRegistry;

  @Override
  public void configure(SimpleMessageListenerContainer container) {
    Arrays.stream(container.getQueueNames()).forEach(queue -> {
      QueueSettings queueSettings = findQueueSettingsByQueue(queue);
      applyQueueSettings(container, queueSettings);
    });
  }

  @ManagedOperation(description = "Concurrency configuration of the consumers for given queue")
  @ManagedOperationParameters({
      @ManagedOperationParameter(name = "queue", description = "name of the queue"),
      @ManagedOperationParameter(name = "concurrency", description = "the number of concurrent consumer to set. Leave empty if no change is desired"),
      @ManagedOperationParameter(name = "maxConcurrency", description = "the number of maximal concurrent consumer to set. Leave empty if no change is desired"),
      @ManagedOperationParameter(name = "autoStartup", description = "auto-startup of the container listener. Leave empty if no change is desired")
  })
  public void setConcurrencyForQueue(String queue, Integer concurrency, Integer maxConcurrency) {
    QueueSettings queueSettings = new QueueSettings(queue, maxConcurrency, concurrency, null, null);
    SimpleMessageListenerContainer container = findContainerByQueue(queue);
    applyQueueSettings(container, queueSettings);
  }

  @ManagedOperation(description = "Batching configuration of the consumers for given queue")
  @ManagedOperationParameters({
      @ManagedOperationParameter(name = "queue", description = "name of the queue"),
      @ManagedOperationParameter(name = "receiveTimeout", description = "the receive timeout in textual format (like PT20s) to set. Leave empty if no change is desired"),
      @ManagedOperationParameter(name = "batchSize", description = "the batch size to set. Leave empty if no change is desired")
  })
  public void setBatchingForQueue(String queue, Duration receiveTimeout, Integer batchSize) {
    QueueSettings queueSettings = new QueueSettings(queue, null, null, receiveTimeout, batchSize);
    SimpleMessageListenerContainer container = findContainerByQueue(queue);
    applyQueueSettings(container, queueSettings);
  }

  @ManagedOperation(description = "Starting or stopping all consumers for given queue")
  @ManagedOperationParameters({
      @ManagedOperationParameter(name = "queue", description = "name of the queue"),
      @ManagedOperationParameter(name = "start", description = "if <b>true</b> the consumers get started otherwise stopped")
  })
  public void startStopConsumers(String queue, boolean start) {
    SimpleMessageListenerContainer container = findContainerByQueue(queue);
    if (start) {
      container.start();
    } else {
      container.stop();
    }
  }

  @ManagedOperation(description = "Returns a list of queues all SimpleMessageListeners are configured for")
  public List<String> getQueueNames() {
    return Objects.requireNonNull(rabbitListenerEndpointRegistry.getIfUnique()).getListenerContainers().stream()
        .filter(SimpleMessageListenerContainer.class::isInstance)
        .map(SimpleMessageListenerContainer.class::cast)
        .map(AbstractMessageListenerContainer::getQueueNames)
        .flatMap(Arrays::stream)
        .collect(Collectors.toList());
  }

  private void applyQueueSettings(SimpleMessageListenerContainer container, QueueSettings queueSettings) {
    Optional.ofNullable(queueSettings.getMaxConcurrency()).ifPresent(container::setMaxConcurrentConsumers);
    Optional.ofNullable(queueSettings.getConcurrency()).ifPresent(container::setConcurrentConsumers);
    Optional.ofNullable(queueSettings.getBatchSize()).ifPresent(container::setBatchSize);
    Optional.ofNullable(queueSettings.getReceiveTimeout()).map(Duration::toMillis).ifPresent(container::setReceiveTimeout);
    Optional.ofNullable(queueSettings.getAutoStartup()).ifPresent(container::setAutoStartup);
  }

  private QueueSettings findQueueSettingsByQueue(String queue) {
    return Stream.concat(rabbitMqProperties.getUpload().queueSettings(), rabbitMqProperties.getDownload().queueSettings())
        .filter(queueSetting -> queueSetting.getQueue().equalsIgnoreCase(queue))
        .findFirst()
        .orElseThrow();
  }

  private SimpleMessageListenerContainer findContainerByQueue(String queue) {
    return Objects.requireNonNull(rabbitListenerEndpointRegistry.getIfUnique()).getListenerContainers().stream()
        .filter(SimpleMessageListenerContainer.class::isInstance)
        .map(SimpleMessageListenerContainer.class::cast)
        .filter(c -> Arrays.asList(c.getQueueNames()).contains(queue))
        .findFirst()
        .orElseThrow();
  }
}

Some explainations:

some questions about the concept:

garyrussell commented 1 year ago

@rfelgent

You are correct; we can't use Lombok in the framework.

We also can't use Boot @Conditional... annotations.

Yes, I prefer a separate class rather than annotating the container.

I am not sure what you mean by "build time configuration" - isn't that already handled by Boot?