rstoyanchev / spring-websocket-portfolio

740 stars 443 forks source link

How to manage who is consuming from specific queue? #69

Open achmadns opened 8 years ago

achmadns commented 8 years ago

Dear @rstoyanchev , I created something like "How not to use SockJS". Now I need to create 'Authorization' feature so that user A can only subscribed into '/amq/queue/user.a', and user B can only subscribed into '/amq/queue/user.b' identified by 'access_token' upon handshake.

I tried to play around with ApplicationListener<SessionSubscribeEvent> and tried to implement what StompSubProtocolHandler.afterSessionEnded() did yet still no luck. It is based on stupid guess actually. Could you please give me pointer?

Executing this code:

final StompSubProtocolHandler handler = (StompSubProtocolHandler) event.getSource();
        event.getMessage().getHeaders().get("simpSessionId");
        publisher.publishEvent(new SessionDisconnectEvent(handler, new GenericMessage<byte[]>("".getBytes()),
                (String) event.getMessage().getHeaders().get("simpSessionId"), CloseStatus.POLICY_VIOLATION));

Got NPE:

java.lang.NullPointerException: null
    at org.springframework.web.socket.messaging.DefaultSimpUserRegistry.onApplicationEvent(DefaultSimpUserRegistry.java:76) ~[spring-websocket-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.messaging.simp.user.MultiServerUserRegistry.onApplicationEvent(MultiServerUserRegistry.java:107) ~[spring-messaging-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:166) ~[spring-context-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:138) ~[spring-context-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:382) ~[spring-context-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:336) ~[spring-context-4.3.0.RC2.jar:4.3.0.RC2]
    at com.firstwap.stream.server.listener.SessionSubscribeEventListener.onApplicationEvent(SessionSubscribeEventListener.java:31) ~[classes/:na]
    at com.firstwap.stream.server.listener.SessionSubscribeEventListener.onApplicationEvent(SessionSubscribeEventListener.java:16) ~[classes/:na]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:166) ~[spring-context-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:138) ~[spring-context-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:382) ~[spring-context-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:336) ~[spring-context-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.web.socket.messaging.StompSubProtocolHandler.publishEvent(StompSubProtocolHandler.java:365) [spring-websocket-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.web.socket.messaging.StompSubProtocolHandler.handleMessageFromClient(StompSubProtocolHandler.java:287) [spring-websocket-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.web.socket.messaging.SubProtocolWebSocketHandler.handleMessage(SubProtocolWebSocketHandler.java:307) [spring-websocket-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.web.socket.handler.WebSocketHandlerDecorator.handleMessage(WebSocketHandlerDecorator.java:75) [spring-websocket-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.web.socket.handler.LoggingWebSocketHandlerDecorator.handleMessage(LoggingWebSocketHandlerDecorator.java:56) [spring-websocket-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.web.socket.handler.ExceptionWebSocketHandlerDecorator.handleMessage(ExceptionWebSocketHandlerDecorator.java:58) [spring-websocket-4.3.0.RC2.jar:4.3.0.RC2]
    at org.springframework.web.socket.adapter.jetty.JettyWebSocketHandlerAdapter.onWebSocketText(JettyWebSocketHandlerAdapter.java:83) [spring-websocket-4.3.0.RC2.jar:4.3.0.RC2]
    at sun.reflect.GeneratedMethodAccessor59.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_92]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_92]
    at org.eclipse.jetty.websocket.common.events.annotated.CallableMethod.call(CallableMethod.java:71) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.events.annotated.OptionalSessionCallableMethod.call(OptionalSessionCallableMethod.java:72) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.events.JettyAnnotatedEventDriver.onTextMessage(JettyAnnotatedEventDriver.java:234) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.message.SimpleTextMessage.messageComplete(SimpleTextMessage.java:69) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.appendMessage(AbstractEventDriver.java:66) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.events.JettyAnnotatedEventDriver.onTextFrame(JettyAnnotatedEventDriver.java:226) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:162) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:367) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.extensions.AbstractExtension.nextIncomingFrame(AbstractExtension.java:176) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.nextIncomingFrame(PerMessageDeflateExtension.java:105) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.extensions.compress.CompressExtension.forwardIncoming(CompressExtension.java:142) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.extensions.compress.PerMessageDeflateExtension.incomingFrame(PerMessageDeflateExtension.java:85) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:220) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:220) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.Parser.parse(Parser.java:256) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.readParse(AbstractWebSocketConnection.java:675) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:505) [websocket-common-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273) [jetty-io-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:95) [jetty-io-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93) [jetty-io-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303) [jetty-util-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148) [jetty-util-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136) [jetty-util-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671) [jetty-util-9.3.9.v20160517.jar:9.3.9.v20160517]
    at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589) [jetty-util-9.3.9.v20160517.jar:9.3.9.v20160517]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_92]

Thanks in advance.

rstoyanchev commented 8 years ago

And you are aware that Spring Security already has this feautre? http://docs.spring.io/spring-security/site/docs/current/reference/html/websocket.html

achmadns commented 8 years ago

Thank you for your quick response. I just get to know that feature. Unfortunately I still can't find my solution. After a subscription is filtered only for any principal that has role 'USER', I need to add additional step, to make sure one to one relationship between queue and user . Let's take an example, subscription is success if the destination contains the username that is being subscribed to achieve specific queue per user and allow only specific user accordingly. Here is the illustration:

@Override
    protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
        messages
                .nullDestMatcher().authenticated()
                .simpDestMatchers("/queueu/**").hasRole("USER").addInterceptor(
                new MessageSubscriptionInterceptor(){
                    public void intercept(SessionSubscribeEvent subscription, WebSocketHandler wsHandler, Principal principal, Map<String, Object> attributes){
                        final String destination = (String) subscription.getMessage().getHeaders().get("destination");
                        if(!destination.contains(principal.getName().toLowerCase())){
                            subscription.cancel("UNAUTHORIZED ACCESS!");
                        }
                    }
                }
        )
                .simpTypeMatchers(MESSAGE, SUBSCRIBE).denyAll()
                .anyMessage().denyAll();
    }

Anyway, thank you. Spring team makes developer's life easier.

ghost commented 8 years ago

Regarding your need "additional step, to make sure one to one relationship between queue and user", I originally thought our product needed to enforce one to one, in order to avoid collisions, however I learned that session IDs can handle that. See "User Destinations" section of the Websockets user guide. http://docs.spring.io/spring/docs/current/spring-framework-reference/html/websocket.html

     For example, a client might subscribe to the destination "/user/queue/position-updates". This destination will be handled by the UserDestinationMessageHandler and transformed into a destination unique to the user session, e.g. "/queue/position-updates-user123". This provides the convenience of subscribing to a generically named destination while at the same time ensuring no collisions with other users subscribing to the same destination so that each user can receive unique stock position updates.