graphql-java-generator / graphql-maven-plugin-project

graphql-maven-plugin is a Maven Plugin for GraphQL, based on graphql-java. It accelerates the development for both the client and the server, by generating the Java code. It allows a quicker development when in contract-first approach, by avoiding to code the boilerplate code.
https://graphql-maven-plugin-project.graphql-java-generator.com
MIT License
119 stars 47 forks source link

Multiple subscriptions not working correctly #72

Closed woolclew closed 3 years ago

woolclew commented 3 years ago

Hello,

I use the graphl-java-generator with maven to generate the source code for client and the server.

I use the following versions

com.graphql-java-generator.graphql-java-runtime 1.12.2 com.graphql-java-generator.graphql-java-client-dependencies 1.12.2 com.graphql-java-generator.graphql-java-server-dependencies 1.12.2 io.reactivex.rxjava2.rxjava 2.2.20

The queries and the mutations are working quite well. I have some problems with the subscriptions. If only one client do a subscription everything works well. If I add more than one subscription-client only the last client will be informed correctly.

After a small research it seems that there is a bug on the server side in the generated file 'WebSocketHandler.java'. In this file the field subscriptionRef (Type: org.reactivestreams.Subscription) seems to be overridden by a new subscription. So only the last Subscription will work well. The previous clients will not informed correctly.

Maybe someone can have a look at this.

By the way. This is a cool project. Please keep working on it!!!!

Best regards.... Alex

etienne-sf commented 3 years ago

By the way. This is a cool project. Please keep working on it!!!!

Thanks for that! :) (I spend a lot of time for this, so this kind of comment is always nice)

Don't worry about that. I'll keep working on this one. I maintained 10 yeats the last project I managed. And I only stopped it, when the technology was no more supported. Which, I guess, is far from now, when speaking about GraphQL!

Thanks for the report and analysis. I'll check that.

Etienne

etienne-sf commented 3 years ago

I confirm that there is an issue. As when I execute two times the current test, in the forum client project, the second one is a failure.

But if I execute two subscriptions in the same test, it works properly.

I wonder if there is a link with the way the subscription is closed. I'll keep on searching for this one...

etienne-sf commented 3 years ago

Hum, there is no direct link with the way the subscription is closed.

And about the WebSocketHandler: This class is created only once, when Spring calls the registerWebSocketHandlers. So the subscriptionRef field is created once. And it can not be overridden, as it is final.

So, yes, there is a bug somewhere. But I can't manage to create a test that repeat it, yet.

Can you give me more details on how you've found (and repeat) this bug?

woolclew commented 3 years ago

Hello Etienne,

How to reproduce the problem: Subscribe two clients and initiate TWO events on the server side for this subscriptions. Normally both clients should be informed twice. But that does not happen.

About the WebSocketHandler: You are right. The WebsocketHandler is a singelton and the subscriptionRef is final. The subscriptionRef is only created once. But the content will be overridden for every subscription. Look at the onSubscribe-Method below.

publisher.subscribe(new Subscriber<ExecutionResult>() {

    @Override
    public void onSubscribe(Subscription s) {
        log.debug("Executing subscription");
        subscriptionRef.set(s);
        request(1);
    }

....
}

I am not so familar with websockets and the rxjava2 stuff. As I understand the WebSocketHandler should handle every client connection. For every client the Methods 'afterConnectionEstablished' and 'afterConnectionClosed' is called. So the WebSocketHandler should handle more than one subscriptionRef.

I build a workaround for our system. I change the WebSocketHandler. I manage the multiple subscritionRefs based on the session. Maybe the code below will explain it a little bit better.

public class WebSocketHandler extends TextWebSocketHandler {

    private static final Logger log = LoggerFactory.getLogger(WebSocketHandler.class);

    GraphQLProvider graphQLProvider;

    // Key (String) is SessionId
    private final HashMap<String, Subscription> subscriptionRef = new HashMap<>();

    public WebSocketHandler(GraphQLProvider graphQLProvider) {
    this.graphQLProvider = graphQLProvider;
    }

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
    log.info("Websocket connection established: {}", session.getId());
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
    synchronized(subscriptionRef) {
        Subscription subscription = subscriptionRef.remove(session.getId());
        if (subscription != null) {
            subscription.cancel();
        } else {
            log.error("Failed to find subscription");
        }
    }
    log.info("Websocket connection closed: {} / Memorized session: {}", session.getId(), subscriptionRef.size());
    }

    @Override
    protected void handleTextMessage(WebSocketSession webSocketSession, TextMessage message) throws Exception {
    String graphqlQuery = message.getPayload();
    log.trace("Websocket said {}", graphqlQuery);

    QueryParameters parameters = QueryParameters.from(graphqlQuery);

    ExecutionInput executionInput = ExecutionInput.newExecutionInput().query(parameters.getQuery())
            .variables(parameters.getVariables()).operationName(parameters.getOperationName()).build();

    Instrumentation instrumentation = new ChainedInstrumentation(singletonList(new TracingInstrumentation()));

    //
    // In order to have subscriptions in graphql-java you MUST use the
    // SubscriptionExecutionStrategy strategy.
    //
    GraphQL graphQL = GraphQL.newGraphQL(graphQLProvider.getGraphQLSchema()).instrumentation(instrumentation)
            .build();

    ExecutionResult executionResult = graphQL.execute(executionInput);

    Publisher<ExecutionResult> publisher = executionResult.getData();

    publisher.subscribe(new Subscriber<ExecutionResult>() {

        private final String sessionId = webSocketSession.getId();
        private Subscription subscription;

        @Override
        public void onSubscribe(Subscription s) {
            log.debug("Executing subscription");
            synchronized(subscriptionRef) {
                subscriptionRef.put(sessionId, s);
            }
            subscription = s;
            subscription.request(1);
        }

        @Override
        public void onNext(ExecutionResult er) {
            log.debug("Sending new notification");
            try {
                Object data = er.getData();
                String json = JsonKit.toJsonString(data);
                webSocketSession.sendMessage(new TextMessage(json));
            } catch (IOException e) {
                e.printStackTrace();
            }
            subscription.request(1);
        }

        @Override
        public void onError(Throwable t) {
            log.error("Subscription threw an exception", t);
            try {
                webSocketSession.close();
            } catch (IOException e) {
                log.error("Unable to close websocket session", e);
            }
        }

        @Override
        public void onComplete() {
            log.info("Subscription complete");
            try {
                webSocketSession.close();
            } catch (IOException e) {
                log.error("Unable to close websocket session", e);
            }
        }
    });
    }

}
etienne-sf commented 3 years ago

You can check the 1.12.3 released today

woolclew commented 3 years ago

Thanks, works like a charm!!

Alex

etienne-sf commented 3 years ago

Great :)