JaidenAshmore / java-dynamic-sqs-listener

Java SQS Listener library built to be customisable and dynamic during runtime
MIT License
51 stars 13 forks source link

Support some 'dynamic' properties at runtime without requiring a restart of containers #339

Closed stephencassidy-r7 closed 3 years ago

stephencassidy-r7 commented 4 years ago

And with another suggestion that I tried implementing (badly) myself šŸ˜„

For some properties, for example the ConcurrentMessageBrokerProperties, the spring container auto configuration will read the value for concurrency level from the spring environment if a property is passed. I added an implementation of ConcurrentMessageBrokerProperties that accepted a Supplier for the property, then this can be dynamically updated (and read again) without having to restart containers.

public final class DynamicConcurrentMessageBrokerProperties implements ConcurrentMessageBrokerProperties {
    private final Supplier<Integer> concurrencyLevel;
    private final Supplier<Duration> preferredConcurrencyPollingRate;
    private final Supplier<Duration> errorBackoffTime;

    public DynamicConcurrentMessageBrokerProperties(
        final Supplier<Integer> concurrencyLevel,
        final Supplier<Duration> preferredConcurrencyPollingRate,
        final Supplier<Duration> errorBackoffTime
    ) {
        Preconditions.checkNotNull(concurrencyLevel, "concurrencyLevel should not be null");
        Preconditions.checkPositiveOrZero(concurrencyLevel, "concurrencyLevel should be greater than or equal to zero");

        this.concurrencyLevel = concurrencyLevel;
        this.preferredConcurrencyPollingRate = preferredConcurrencyPollingRate;
        this.errorBackoffTime = errorBackoffTime;
    }

    @PositiveOrZero
    @Override
    public int getConcurrencyLevel() {
        return concurrencyLevel.get();
    }
}

I'm not too sure if all properties could be do this way e.g. if changing a backoff rate mid processing would cause issues, but for my own stuff allowing the concurrency rate to change on the fly seemed ok!

Again this is probably more an edge case but I'd be happy to try and explain the use case better if interested šŸ˜ƒ

JaidenAshmore commented 4 years ago

Yeah this is exactly the sort of dynamic functionality that I want the library to allow. All the other Java SQS implementations I have worked with in the past have involved restarting containers whenever a property changes which isn't great if I want to have fast changes in concurrency rate, e.g. via a feature flag.

If you use the core library, not the spring wrapper, you can use those properties like you have implemented to provide true dynamic properties. You can see this in the core-example where I have gotten the concurrency rate to change without restarting the container.

So you have seen the flaw with the Spring wrapper with this library and something I have been wanting to fix. As we are using annotations for the configuration for this, we can't provide dynamic values. One way I have been pondering with fixing this was to provide the properties as a bean, e.g. you could do something like:

@Component
class MessageListener {
    @Bean
    public FifoMessageListenerContainerProperties myDynamicFifoProperties() {
             return new FifoMessageListenerContainerProperties() {
                       public int concurrencyLevel() {
                               // your dynamic code lives here
                       }

                       // other implementations
             };
    }

    @FifoQueueListener(propertyFactoryBean = "myDynamicFifoProperties")
    public void listener(final Message message) {
          // process message
    }
}

With this implementation, it is up to the consumer to how you define these properties and it could use a style like the dynamic properties that you written. I still don't love it but it is probably the least amount of effort for the consumer to implement what we want.

The other option that I was considering was just allowing you to include your own MessageListenerContainer bean:

@Component
public class MessageListeners {
    @Bean
    public MessageListenerContainer myQueueListener(final SqsAsyncClient sqsAsyncClient) {
        final QueueProperties queueProperties = QueueProperties.builder().queueUrl("someUrl").build();
        return new FifoMessageListenerContainer(
            queueProperties,
            sqsAsyncClient,
            () -> new LambdaMessageProcessor(sqsAsyncClient, queueProperties, lambda);
                messageProcessor(
                    queueProperties,
                    message -> {
                          // the logic you can use to process this message
                    }
                ),
            new FifoMessageListenerContainerProperties() {
                // your dynamic overriden methods here
            }
        );
     }
}

Earlier in this library's life I had implementations of these properties (e.g. a CachingConcurrentMessageBrokerProperties) that did similar stuff to what you have linked but I decided to remove them to reduce the amount of code in the library and remove dependencies on Guava etc. So I probably wouldn't pull in the exact dynamic implementation you have into the library but am keen on helping you with your use case.

So I guess my question to you would be do you think one of the above would be the best for your use case or can you see a better way to provide these dynamic values for a Spring app?

scassidy1986 commented 4 years ago

Hey, cheers again for the quick reply and the library, it's been great so far -

I like the idea of being able to supply a named bean for the properties, but could see problems with it - I had tried something similar with a PropertiesFactory class that would be passed to the MessageListenerContainerFactory, for example

@Slf4j
@RequiredArgsConstructor
public class StaticPrefetchingPropertiesFactory implements PrefetchingPropertiesFactory  {
    private final Environment environment;

    public ConcurrentMessageBrokerProperties messageBrokerProperties(final PrefetchingQueueListener annotation) {
         return StaticConcurrentMessageBrokerProperties.builder()
              .concurrencyLevel(getConcurrencyLevel(annotation))
              .build();
    } 
}

@Slf4j
@RequiredArgsConstructor
public class PrefetchingMessageListenerContainerFactory
    extends AbstractAnnotationMessageListenerContainerFactory<PrefetchingQueueListener> {
    private final PrefetchingPropertiesFactory propertiesFactory;

      private Supplier<MessageBroker> buildMessageBrokerSupplier(PrefetchingQueueListener annotation) {
        return () ->  new ConcurrentMessageBroker(propertiesFactory.messageBrokerProperties(annotation));
    }
}

Then users could override and supply their own bean per type if required without having to re-implement the provided MessageContainerFactory classes.

public class DynamicPrefetchingPropertiesFactory extends StaticPrefetchingPropertiesFactory {
    // override methods as needed
}

But this could possibly mean having to re-write alot of code per type (prefetching, batching), and maybe introduce some extra work for the user.

So I probably wouldn't pull in the exact dynamic implementation you have into the library

Yeah I completely agree with that as well - this was the main reason I tried experimenting with properties class that accepted Supplier<> as values so in theory anything could be passed, whether it's simply a call to the Spring Environment class or another property provider - this would possibly cover cases where additional 'dynamic' property sources were part of the Spring environment (something like spring-cloud-config maybe) so calling the getValue methods would always read the most up-to-date value I think - so the container factory class would wrap the existing methods in a Supplier

    private Supplier<MessageBroker> buildMessageBrokerSupplier(PrefetchingQueueListener annotation) {
        return () ->
            new ConcurrentMessageBroker(DynamicConcurrentMessageBrokerProperties.builder()
        .concurrencyLevel(() -> getConcurrencyLevel(annotation))
        .build());
    }

Happy to discuss this further - hopefully I've made some sense!

JaidenAshmore commented 4 years ago

Yeah, let me have a ponder on this and get back to you. Definitely something that needs to be improved here so I will do some experimenting on what can be done.

JaidenAshmore commented 4 years ago

Also I was planning to do this issue (I hadn't created it before but was in my mind): https://github.com/JaidenAshmore/java-dynamic-sqs-listener/issues/345 which I think would make your functionality a little bit easier as it would handle constructing the message retrievers, etc. I would be looking at integrating this in 4.4.0 of the library. Once that is done I can look at this ticket again.

JaidenAshmore commented 3 years ago

Hey, was this something that you were thinking: spring-how-to-have-listener-dynamic-properties.md ? Here are some tests doing the overrides: QueueListenerConfigurationTest.

I think it aligns with what you were thinking but let me know if that is not that case. I can make a release of this version if you think it would cover this use case.

stephencassidy-r7 commented 3 years ago

@JaidenAshmore Just had a quick look at this and it looks great, when I get a chance I will try it out but from the looks of it this is exactly what I was after, cheers!

scassidy1986 commented 3 years ago

So I finally got round to testing out the branch with this code and it works great - just needed to extend the existing classes and override the required methods and was able to toggle concurrency levels and batching / prefetching sizes without having to restart any containers or services! šŸ¾

JaidenAshmore commented 3 years ago

Sweet, I will go publish a version probably in a couple of hours.

JaidenAshmore commented 3 years ago

Release Notes

Adds the ability to more easily override the values for a message listener annotation by providing a custom annotation parser bean. A use case for this it to provide custom logic for calculating the concurrency level allowing the message listener to dynamically change the concurrency rate.

Available annotation parsers

Example

  1. Provide a custom annotation parser bean, for example the PrefetchingQueueListenerParser which is used to parse the @PrefetchingQueueListener annotation.

    public class CustomPrefetchingQueueListenerParser extends PrefetchingQueueListenerParser {
        private static final Random random = new Random();
        private final LoadingCache<Boolean, Integer> cachedConcurrencyLevel = CacheBuilder
                            .newBuilder()
                            .expireAfterWrite(10, TimeUnit.SECONDS)
                            .build(CacheLoader.from(() -> random.nextInt(10)));
    
        public CustomPrefetchingQueueListenerParser(Environment environment) {
            super(environment);
        }
    
        @Override
        protected Supplier<Integer> concurrencySupplier(PrefetchingQueueListener annotation) {
            return () -> cachedConcurrencyLevel.getUnchecked(true);
        }
    }

    In this example above we are using a Guava cache to make sure that the concurrency level is cached for a certain time period before it switches values.

  2. Include this parser as a bean, this will replace the existing implementation and will be used for all @PrefetchingQueueListener message listeners.

    class MyConfiguration {
    
    public PrefetchingQueueListenerParser customParser(final Environment environment) {
        return new CustomPrefetchingQueueListenerParser(environment);
    }
    }

Upgrade steps

This will not break consumers of this library but if you are building your own aspects of this library manually you may need to modify some of the constructors. For example the PrefetchingMessageListenerContainerFactory now takes a PrefetchingQueueListenerParser instead of the Spring Environment.