micronaut-projects / micronaut-nats

Integration between Micronaut and Nats.io (https://nats.io/)
Apache License 2.0
14 stars 9 forks source link

Micronaut Nats removes subjects not specified in application.yml when updating streams #422

Closed yuhsin7676 closed 3 months ago

yuhsin7676 commented 9 months ago

Expected Behavior

The subjects specified in the stream configuration (application.yml) are created or not created. Unspecified subjects remain

Actual Behaviour

Subjects in streams specified in the configuration are created, and unspecified ones are deleted

Steps To Reproduce

1) git clone https://github.com/yuhsin7676/micronaut-nats-test.git 2) Start nats from "docker" directory using docker-compose 3) Run or debug the program from any IDE 4) Observe the last 2 lines in the console: before: [test1.sub1, test1.sub2] after: [test1.sub1]

Environment Information

Example Application

https://github.com/yuhsin7676/micronaut-nats-test.git

Version

verified in version 4.1.6 (the bug expected in any 4.* versions)

grimmjo commented 9 months ago

Hey @yuhsin7676

thanks for opening this issue. I can confirm this is a bug. Could you give some more details to your usecase?

As far as I understand, the desired behavior is that the stream configuration doesn't update the subjects. I just could imagine one of the two solutions:

  1. Not updating the stream configuration at all. We just create it, if it's not available.
  2. Throwing an exception that there is a mismatch in the configuration and stopping the application.

@graemerocher or @sdelamo: What of the solutions do you prefer? Is there a general way how Micronaut should behave when interacting with message brokers or is this part of the administration of the message broker and we shouldn't care about it?

yuhsin7676 commented 9 months ago

Hello, @grimmjo

The desired behavior is as follows: when the stream is updated, the subjects specified in the configuration are added to the current stream if they do not exist. Subjects not specified in the configuration remain.

Neither of your two solutions suits us, so we came up with our own solution.

For example:

There are 3 microservices, 2 microservices are subscribed to subject test1.sub1 and test1.sub2 (and they exist). The 3rd microservice is subscribed only to test1.sub1. If the 3rd microservice restarts after an error, then only test1.sub1 will remain. The 2nd solution does not suit us, because the service simply will not start, and the entire application will begin to work incorrectly.

Also, we wanted to raise the 4th and 5th microservices, which are subscribed to test1.sub1, test1.sub2, test1.sub3. The stream must be updated, so the 1st solution is also bad for us.

Our local solution is to replace the io.micronaut.nats.jetstream.JetStreamFactory class with a CustomJetStreamFactory:

@Factory
@Replaces(factory = JetStreamFactory.class)
public class CustomJetStreamFactory {

    private final BeanContext beanContext;

    @Inject
    public CustomJetStreamFactory(BeanContext beanContext) {
        this.beanContext = beanContext;
    }

    @Singleton
    @EachBean(NatsConnectionFactoryConfig.class)
    JetStreamManagement jetStreamManagement(NatsConnectionFactoryConfig config) throws IOException {
        if (config.getJetstream() != null) {
            return getConnectionByName(config.getName()).jetStreamManagement(
                    config.getJetstream().toJetStreamOptions());
        }
        return null;
    }

    @Singleton
    @EachBean(NatsConnectionFactoryConfig.class)
    JetStream jetStream(NatsConnectionFactoryConfig config) throws IOException, JetStreamApiException {
        if (config.getJetstream() != null) {
            Connection connection = getConnectionByName(config.getName());

            final JetStreamManagement jetStreamManagement = getConnectionByName(config.getName()).jetStreamManagement(
                    config.getJetstream().toJetStreamOptions());

            // initialize the given stream configurations
            for (NatsConnectionFactoryConfig.JetStreamConfiguration.StreamConfiguration stream : config.getJetstream()
                    .getStreams()) {
                final StreamConfiguration streamConfiguration = stream.toStreamConfiguration();
                if (jetStreamManagement.getStreamNames().contains(streamConfiguration.getName())) {
                    StreamInfo streamInfo = jetStreamManagement.getStreamInfo(streamConfiguration.getName());

                    // begin added custom code
                    StreamConfiguration current = streamInfo.getConfiguration();
                    for (String sub : current.getSubjects()) {
                        if (!streamConfiguration.getSubjects().contains(sub)) {
                            streamConfiguration.getSubjects().add(sub);
                        }
                    }
                    // end added custom code

                    if (!streamInfo.getConfiguration().equals(streamConfiguration)) {
                        jetStreamManagement.updateStream(streamConfiguration);
                    }
                } else {
                    jetStreamManagement.addStream(streamConfiguration);
                }
            }

            return connection.jetStream(config.getJetstream().toJetStreamOptions());
        }
        return null;
    }

    private Connection getConnectionByName(String connectionName) {
        return beanContext.findBean(Connection.class, Qualifiers.byName(connectionName))
                .orElseThrow(() -> new IllegalStateException(
                        "No nats connection found for " + connectionName));
    }

}
yuhsin7676 commented 9 months ago

Add the solution into new branch "solution": https://github.com/yuhsin7676/micronaut-nats-test/tree/solution