spring-projects / spring-framework

Spring Framework
https://spring.io/projects/spring-framework
Apache License 2.0
56.31k stars 38k forks source link

Broken WebSocket subscription using simple message broker #26118

Closed BenS89 closed 3 years ago

BenS89 commented 3 years ago

We used WebSockets within Spring Boot 2.3.5 using a simple message broker. After the update to Spring Boot 2.4 / Spring Framework 5.3, we noticed that our web socket mechanism is not working anymore. We use the following web socket config:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic", "/user");
        config.setApplicationDestinationPrefixes("/app");
        config.setUserDestinationPrefix("/user");
    }

    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket");
        registry.addEndpoint("/websocket").setAllowedOrigins("http://localhost:8080").withSockJS();
    }

    @Bean(name = "websocketLocaleResolver")
    public SessionLocaleResolver websocketLocaleResolver() {
        SessionLocaleResolver websocketLocaleResolver = new SessionLocaleResolver();
        websocketLocaleResolver.setDefaultLocale(Locale.GERMANY);

        return websocketLocaleResolver;
    }

}

A look at the DefaultSubscriptionRegistry shows that there changed a lot comparing to Spring Boot 2.3.5. We could nail down the issue because we saw that even though our web socket subscription was successful within the DefaultSubscriptionRegistry, the method this.subscriptionRegistry.findSubscriptions(message) within the SimpleBrokerMessageHandler always returns an empty map (in contrast to Spring Boot 2.3).

protected void sendMessageToSubscribers(@Nullable String destination, Message<?> message) {
        MultiValueMap<String,String> subscriptions = this.subscriptionRegistry.findSubscriptions(message);
...

When rolling back to Spring 2.3.5, the same web socket business logic works fine with the old implementations of DefaultSubscriptionRegistry and SimpleBrokerMessageHandler.

We also noticed that with Spring Boot 2.3.5 the DestinationCache returns a map including a SessionId and SubscriptionId (expected behavior). Using Spring Boot 2.4.0, the returned LinkedMultiValueMap sessionIdToSubscriptionIds is always empty.

public LinkedMultiValueMap<String, String> getSubscriptions(String destination) {
            LinkedMultiValueMap<String, String> sessionIdToSubscriptionIds = this.destinationCache.get(destination);
            if (sessionIdToSubscriptionIds == null) {
                sessionIdToSubscriptionIds = this.destinationCache.computeIfAbsent(destination, _destination -> {
                    LinkedMultiValueMap<String, String> matches = computeMatchingSubscriptions(destination);
                    // Update queue first, so that cacheSize <= queue.size(
                    this.cacheEvictionPolicy.add(destination);
                    this.cacheSize.incrementAndGet();
                    return matches;
                });
                ensureCacheLimit();
            }
            return sessionIdToSubscriptionIds;
        }
snicoll commented 3 years ago

Thanks for the report but none of that code is part of Spring Boot. I am transferring the issue to the Spring Framework issue tracker so that the team can have a look.

rstoyanchev commented 3 years ago

Yes there was a major refactoring in DefaultSubscriptionRegistry to reduce locking contention. Is this something that you can reproduce reliably, and if yes you can provide more detailed instructions or a sample?

BenS89 commented 3 years ago

Thanks for your feedback. In the meantime, we found the root cause of the issue. In Spring Framework 5.3, only one destination per {sessionId, subscriptionId} is allowed within the DefaultSubscriptionRegistry:

Spring-5.3.1 -> DefaultSubscriptionRegistry$SessionRegistry
    public void addSubscription(String sessionId, Subscription subscription) {
        SessionInfo info = this.sessions.computeIfAbsent(sessionId, _sessionId -> new SessionInfo());
        info.addSubscription(subscription);
Spring-5.3.1 -> DefaultSubscriptionRegistry$SessionInfo
    public void addSubscription(Subscription subscription) {
        this.subscriptionMap.putIfAbsent(subscription.getId(), subscription);

In Spring 5.2, storing multiple destinations for this tuple is possible:

Spring-5.2.10 -> DefaultSubscriptionRegistry$SessionSubscriptionRegistry
    public SessionSubscriptionInfo addSubscription(String sessionId, String subscriptionId,
            String destination, @Nullable Expression selectorExpression) {
        SessionSubscriptionInfo info = this.sessions.get(sessionId);
        if (info == null) {
            info = new SessionSubscriptionInfo(sessionId);
            SessionSubscriptionInfo value = this.sessions.putIfAbsent(sessionId, info);
            if (value != null) {
                info = value;
            }
        }
        info.addSubscription(destination, subscriptionId, selectorExpression);
        return info;
Spring-5.2.10 -> DefaultSubscriptionRegistry$SessionSubscriptionInfo
    public void addSubscription(String destination, String subscriptionId, @Nullable Expression selectorExpression) {
        Set<Subscription> subs = this.destinationLookup.get(destination);
        if (subs == null) {
            synchronized (this.destinationLookup) {
                subs = this.destinationLookup.get(destination);
                if (subs == null) {
                    subs = new CopyOnWriteArraySet<>();
                    this.destinationLookup.put(destination, subs);
                }
            }
        }
        subs.add(new Subscription(subscriptionId, selectorExpression));

In our frontend, we use the following code for the web socket subscription:

public subscribe(callback: any, locale: string): void {

        this.subscribers.push(callback);

        const url = '/websocket';

        const socket = new SockJS(url);
        this.client = Stomp.over(socket);

        this.client.debug = () => { };

        const headers = {
            locale: locale
        };

        this.client.connect(headers, frame => {
            this.client.subscribe('/user/topic/unread', message => this.onMessage(message));
            this.client.send('/app/unread', headers, "{}");
        });

This results in two requests having different destinations with the same session ID / subscription ID, for example:

Request 1: image

Request 2: image

For the second request, we can not save the destination because in addSubscription this.subscriptionMap already has an entry for the given subscription ID:

public void addSubscription(Subscription subscription) {
    this.subscriptionMap.putIfAbsent(subscription.getId(), subscription);
}

Therefore, the main difference between Spring 5.2 and 5.3 seems like the ability to store multiple destinations for a given {sessionId, subscriptionId} within the old 5.2 implementation.

Was it intended to remove the ability to store multiple destinations for existing session/sub ids?

rstoyanchev commented 3 years ago

@BenS89 thanks for the extra details.

In the STOMP protocol the same user can subscribe more than once to the same destination but each subscription must have a unique id. You can also see it on the MESSAGE frame (from server to client) which includes only the subscription id, and not the destination and that is how messages can be correlated to the subscription. So you can subscribe twice to the same destination and still get messages for each subscription separately since each subscription should have a unique id.

I don't quite understand how you make use of this but it is unusual for broker and user destinations to overlap, and I would not expect to see that. A user prefixed destination is just a general representation of a topic (let's category) that is meant to be transformed into a unique destination specific to the user. So the broker should only get one subscription for the user.

BenS89 commented 3 years ago

Thanks for your feedback. If I unterstand you right, the mentioned change was intended. We found a way to avoid one of the two requests. That fixed the issue for us as well. Therefore, the issue can be closed.

trim09 commented 3 years ago

@BenS89 thanks such detailed report. You are right. The possibility to register/subscribe with the same subscriptionId on the same sessionId twice was intentionally removed. It opened us a way for a huge performance boost. We though that we could afford it because of this statement on Stomp 1.2 specification: Within the same connection, different subscriptions MUST use different subscription identifiers.

It means it should work for you if you could increment subscriptionId "sub-0" to "sub-1" for the second subscription. Unfortunately, I am not sure how you could do it in JS snippet you provided.

trim09 commented 3 years ago

@BenS89 I came across this documentation and it says that you should be able to set an unique subscription ID this way: this.client.subscribe('/user/topic/unread', message => this.onMessage(message), { id: mysubid });

By the way, I wonder why the Stomp did a subscription on: this.client.send('/app/unread', headers, "{}"); I think it should just send that frame without subscribtion.

CrazyChris75 commented 3 years ago

Hi everyone, Wondering how this issue is closed?

A client subscribing to '/user/queue/objects' wants to receive user-specific messages - and this has been working for many years ... until now :(

SUBSCRIBE shown here:

2021-01-17 10_44_24-Orders - Brave

In DefaultSubscriptionRegistry i can see 2 calls to addSubscriptionInternal => SessionInfo.addSubscription

2021-01-17 11_04_54-n3s-foundation – DefaultSubscriptionRegistry class  Maven_ org springframework_s

and

2021-01-17 11_05_02-n3s-foundation – DefaultSubscriptionRegistry class  Maven_ org springframework_s

I guess Spring creates the 2nd call because /user destinations need special attention with the -userxxx suffix. However I don't get how the 2nd one would ever make it into the map of subscriptions with the new putIfAbsent?

I have several apps with different setups and everything works fine in a WAR containing Java server (with Spring Framework) and HTML/JS Client deployed to Tomcat. This is confusing because here I also see the 2 calls to addSubscription with the 2nd one not making it into the map due to putIfAbsent - but it works anyway?!?

BUT ... it simply won't work in a Spring Boot app where the client is NOT in the same WAR but instead is a react app on a different domain. Yes, the cross-domain things are all set up and general websocket messages to topics work fine but user-specific messages are simply not sent as if there are no subscriptions (nothing showing up in trace logs either).

How can this be used now? I have spent way to many hours now trying to figure this out because it worked just fine before the Spring upgrade.

Cheers, Chris

rstoyanchev commented 3 years ago

@CrazyChris75 this issue is about subscribing twice on the same session with an identical subscription id. That is not allowed by the protocol, a subscription id must be unique. Is this what you are raising as well? If not please do not comment further on this issue.

CrazyChris75 commented 3 years ago

@rstoyanchev As you can see in my screenshots I am subscribing ONCE, Spring framework calls SessionInfo.addSubscription twice (the 2nd one being useless?!) and user-specific server-to-client messages do not work anymore. I also think you are misreading the original issue since it is NOT about subscribing twice with the same subId but the fact that Spring framework calls "addSubscriptionInternal" twice.

rstoyanchev commented 3 years ago

Yes it's a little more indirect than that. In the original issue, the client subscribes to "/user/topic/unread" but the server maps "/user" to both the broker and the user destination message handler which is not expected:

    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic", "/user");
        config.setApplicationDestinationPrefixes("/app");
        config.setUserDestinationPrefix("/user");
    }

"/user" prefixed messages aren't supposed to go directly to the broker. They are meant to be transformed by the user destination message handler first, which will then send them on the broker channel.

cfecherolle commented 1 year ago

Thank you @rstoyanchev, your last answer sent me in the right direction: it seems that in my case, the problem was that destinations starting with /user were not treated as user-specific destinations and I now have to specify this in my config.

Indeed, before fixing, I had two consecutive subscription additions to the registry, thus one was actually not added because of the code change in Spring, which was mentioned earlier in this thread.

It used to work before because Spring was able to add more than one subscription for the same id (as explained in previous answers) and stopped working now because of the two consecutive subscriptions: the one that was actually doing the work was ignored, breaking the communication.

To fix it, I added this override to my WebSocketMessageBrokerConfigurer implementation:

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setUserDestinationPrefix("/user");
    }

By explicitly declaring the user destination prefix, the first subscription does not get wrongfully added, which allows the second one to be registered successfully. This fix restored functionality to what it was before I did the Spring upgrade, and my client can now receive messages again.

I hope this can help someone later.