spring-projects / spring-framework

Spring Framework
https://spring.io/projects/spring-framework
Apache License 2.0
56.35k stars 38.03k forks source link

spring websocket rabbitmq external relay in multiserver environment does not scale #30347

Closed fabioportieri closed 9 months ago

fabioportieri commented 1 year ago

In my app i have a frontend that connects via gateway to two instances of the same microservice, configured with an external rabbitmq broker, everything is good if i launch only one instance, the following scenario occurs when i deploy 2 instances:

  1. user login and make a websocket connection to the ms A

  2. user triggers a web rest endpoint , that gets load balanced into ms A, the ms fetch the simp session id from the websocket user registry, and send a message to the subscribed channel, no errors

  3. user client sees the websocket data as expected

  4. user triggers again the same web rest endpoint, that gets load balanced into ms B, the ms fetch the simp session id from the websocket user registry, and send a message to the subscribed channel, no errors

  5. the request never arrives to the broker or to the client/browser, the logs shows the following in StompBrokerRelayMessageHandler:

No TCP connection for session oxsyxbwp in GenericMessage [payload=byte[1094], headers={simpMessageType=MESSAGE, nativeHeaders={simpOrigDestination=[/user/exchange/amq.direct/get-notifications]}, contentType=application/json, simpSessionId=oxsyxbwp, simpDestination=/exchange/amq.direct/get-notifications-useroxsyxbwp}]

Expected: to be able to succesfully publish message for a user websocket subscription, even from the microservice instance that didn't create the websocket connection/session to the client, since i'm using an external broker, and spring is supposed to alert the other server that "own" the websocket session, as far as i understand

this is the relevant configuration

   @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/topic", "/queue", "/exchange/")
            .setAutoStartup(Boolean.TRUE)
            .setSystemHeartbeatReceiveInterval(50000)
            .setSystemHeartbeatSendInterval(50000)
            .setRelayHost(applicationProperties.getWebsocket().getRelayhost())
            .setRelayPort(applicationProperties.getWebsocket().getRelayport())
            .setClientLogin(applicationProperties.getWebsocket().getLogin())
            .setSystemLogin(applicationProperties.getWebsocket().getLogin())
            .setSystemPasscode(applicationProperties.getWebsocket().getPasscode())
            .setClientPasscode(applicationProperties.getWebsocket().getPasscode())
            .setTcpClient(createTcpClient())

            .setUserDestinationBroadcast("/topic/unresolved.user.dest")
            .setUserRegistryBroadcast("/topic/registry.broadcast");

        config.setPathMatcher(new AntPathMatcher("."));
        config.setUserDestinationPrefix("/user");

    }
   private ReactorNettyTcpClient<byte[]> createTcpClient() {

        return new ReactorNettyTcpClient<>(applicationProperties.getWebsocket().getRelayhost(),
            applicationProperties.getWebsocket().getRelayport(),
            new StompReactorNettyCodec());
   }

this is how i publish messages:

public static final String CHANNEL_NAME = "/exchange/amq.direct/get-notifications";
public static final String NOTIFICATION_DEST = "/user/exchange/amq.direct/get-notifications";

 retrieveWebsocketSessionId(jhiUserId, users)
                .ifPresent(sessionId ->
                    // send to user
                    messagingTemplate.convertAndSendToUser(
                        sessionId,
                        CHANNEL_NAME,
                        notificationList,
                        simpMessageHelper.createHeaders(sessionId)
                    )
                );

    private Optional<String> retrieveWebsocketSessionId(String jhiUserId, Set<SimpUser> users) {
        for (SimpUser user : users) {
            if (user.getName().equals(jhiUserId)) {
                for (SimpSession session : user.getSessions()) {
                    // find if that session has our subscription associated
                    for (SimpSubscription sub : session.getSubscriptions()) {
                        if (sub.getDestination().equals(NOTIFICATION_DEST)) {
                            log.info("user {} is logged in. forwarding message to destination {} with session Id: {}", jhiUserId, sub.getDestination(), session.getId());
                            return Optional.of(session.getId());
                        }
                    }
                }
            }
        }
        log.info("user {} currently is not logged in or he hasn't a websocket connection yet. discard message", jhiUserId);
        return Optional.empty();
    }

thank you in advance for any pointers

p.s. version: spring-boot-starter-websocket\2.7.3\spring-boot-starter-websocket-2.7.3.pom

snicoll commented 9 months ago

I am trying to figure out if the broker is configured correctly but that's hard to know given that you're not showing the actual configuration. Can you please move that code snippet into an actual sample that we can run.

You can attach a zip to this issue or push the code to a separate GitHub repository.

spring-projects-issues commented 9 months ago

If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.

fabioportieri commented 9 months ago

hello, this is my broker configuration, as a docker compose service:

  rabbitmq:
    image: rabbitmq:3-management
    hostname: rabbitmq
    container_name: rabbit-broker
    environment:
      - RABBITMQ_DEFAULT_USER=....
      - RABBITMQ_DEFAULT_PASS=....
    ports:
      - 5672:5672
      - 15672:15672
      - 15673:61613
    volumes:
      - "./rabbit_enabled_plugins:/etc/rabbitmq/enabled_plugins"
      - "./rabbitmq/10-defaults.conf:/etc/rabbitmq/conf.d/10-defaults.conf"
    networks:
      csg_net:
        aliases:
          - rabbitmq

10-defaults.conf:

loopback_users.guest = false

log.console = true

log.file.level = debug

tcp_listen_options.keepalive = true
heartbeat = 60

rabbit_enabled_plugins:

[rabbitmq_amqp1_0,rabbitmq_federation_management,rabbitmq_management,rabbitmq_web_dispatch,rabbitmq_management_agent,rabbitmq_stomp].

thank you for your assistance

p.s. so i guess at least my assumptions on the expected behaviour is correct?

snicoll commented 9 months ago

I apologize, I misread the report and thought that the problem was about a clustered rabbitMQ vs. single instance RabbitMQ.

I believe your assumptions are correct, yes. I'll follow-up internally.

rstoyanchev commented 9 months ago

Any messages sent from the server side goes over the "system" TCP connection that's used for broadcasting. Such messages do not have a session id header. As opposed to messages that originate from the client and their session id should have a corresponding TCP Connection.

I can't see exactly but the simpMessageHelper.createHeaders(sessionId) line is possibly the problem by setting a session id in the headers and making it look like the message is actually coming from the client, which it is not, and in that case we expect there to be a TCP connection already. Try making sure there is no session id set in the headers. That should make it work.

fabioportieri commented 9 months ago

Thanks for the response

a long time passed since i opened the issue so i'm not able to test it right away, anyway, i was under the assumption that the headers are required, in order to make messagingTemplate.convertAndSendToUser() work, so i'm worried that is not so simple as to just remove the headers

see also this: https://stackoverflow.com/questions/62456213/spring-websocket-convertandsendtouser-not-working-but-convertandsend-working or this: https://github.com/spring-projects/spring-framework/issues/24249

rstoyanchev commented 9 months ago

Apologies for the delay.

i was under the assumption that the headers are required, in order to make messagingTemplate.convertAndSendToUser() work

No, not really. This is why SimpMessagingTemplate has a dedicated convertAndSendToUser method that expresses the required inputs which are the user, the destination, and the payload. Nothing else is required. Specifically the simpSessionId header should only be present when there is an established session on that server, and that's set automatically at the WebSocket transport level. This explains the "No TCP connection" error message since for every established session there is a TCP connection to the broker.

https://stackoverflow.com/questions/62456213/spring-websocket-convertandsendtouser-not-working-but-convertandsend-working

This is completely unrelated other than that both have to do with user destinations, and the person who created the post wrote this at the bottom (instead of updating the post):

Finally figured out the problem. I did a silly mistake. I was filling the User DestinationPrefix as part of WebSocket config. but didn't set it up for the injected bean SimpMessaging template.

24249

Also unrelated, and closed as invalid due to usage error.

Closing for now as I don't see anything further we can do.