Netflix / dgs-framework

GraphQL for Java with Spring Boot made easy.
https://netflix.github.io/dgs
Apache License 2.0
3.07k stars 295 forks source link

bug: Complete event not sent for SSE Subscriptions #1435

Closed yogeshdengle closed 2 months ago

yogeshdengle commented 1 year ago

Expected behavior

For SSE Subscriptions on the complete of the events the server sends a 'complete' event. Currently once the Publisher sent to the DGS framework as part of the SSE Subscription finishes there is no indication on the subscription that the server is done sending the data.

Actual behavior

the graphql-sse library (https://github.com/enisdenjo/graphql-sse) which is very commonly used in the graphql ecosystem expects a 'complete' event to be sent to indicate that the server is done sending events. The spec is outlined here: https://github.com/graphql/graphql-over-http/blob/d51ae80d62b5fd8802a3383793f01bdf306e8290/rfcs/GraphQLOverSSE.md#complete-event-1

This causes the library to retry the subscription as it think it disconnected from the server. The "complete" event is used by the library to understand that this is a clean closure of the connection and not something unintended.

Steps to reproduce

Create a subscription that has a publisher that just sends out 3 events. Expectation is that after the 3 events we get a "complete" event.

Ancient-Dragon commented 1 year ago

@yogeshdengle if you would like a java implementation (and can't upgrade to spring-boot 3 straight away you can just use this rest controller and remove the dgs sse package. (FYI this was my testing code so probably has some unnecessary imports and logs etc)


package com.example.demo.services;

import com.example.demo.models.GraphqlServerSentEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.graphql.dgs.DgsQueryExecutor;
import com.netflix.graphql.types.subscription.Error;
import com.netflix.graphql.types.subscription.QueryPayload;
import com.netflix.graphql.types.subscription.SSEDataPayload;
import graphql.ExecutionResult;
import graphql.InvalidSyntaxError;
import graphql.language.Document;
import graphql.language.OperationDefinition;
import graphql.parser.InvalidSyntaxException;
import graphql.parser.Parser;
import graphql.validation.ValidationError;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerErrorException;
import org.springframework.web.server.ServerWebInputException;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

@RestController
public class DgsSSEControllerOriginal {

    DgsQueryExecutor dgsQueryExecutor;
    @Value("${dgs.graphql.sse.pollPeriod:12000}")
    Long pollPeriod;
    private Logger logger = LoggerFactory.getLogger(getClass().getName());
    private final ObjectMapper mapper = new ObjectMapper();
    private static final String NEXT_EVENT = "next";
    private static final String COMPLETE_EVENT = "complete";

    @Autowired
    public DgsSSEControllerOriginal(DgsQueryExecutor dgsQueryExecutor) {
        this.dgsQueryExecutor = dgsQueryExecutor;
    }

    @PostMapping(path = "${dgs.graphql.sse.path:/subscriptions}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> handlePost(@RequestBody String body) {
        try {
            return handleSubscription(body);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private Flux<ServerSentEvent<String>> handleSubscription(String query) throws IOException {
        QueryPayload queryPayload;
        try {
            queryPayload = mapper.readValue(query, QueryPayload.class);
        } catch (Exception ex) {
            throw new ServerWebInputException("Error parsing query: " + ex.getMessage());
        }

        if (!isSubscriptionQuery(queryPayload.getQuery())) {
            throw new ServerWebInputException("Invalid query. operation type is not a subscription");
        }

        ExecutionResult executionResult = dgsQueryExecutor.execute(queryPayload.getQuery(), queryPayload.getVariables());
        if (!executionResult.getErrors().isEmpty()) {
            String errorMessage;
            if (executionResult.getErrors().stream().anyMatch(error -> error instanceof ValidationError || error instanceof InvalidSyntaxError)) {
                errorMessage = "Subscription query failed to validate: " + executionResult.getErrors().stream().map(Object::toString).collect(Collectors.joining());
            } else {
                errorMessage = "Error executing subscription query: " + executionResult.getErrors().stream().map(Object::toString).collect(Collectors.joining());
            }
            logger.error(errorMessage);
            throw new ServerWebInputException(errorMessage);
        }

        Publisher<ExecutionResult> publisher;
        try {
            publisher = executionResult.getData();
        } catch (ClassCastException exc) {
            logger.error("Invalid return type for subscription datafetcher. A subscription datafetcher must return a Publisher<ExecutionResult>. The query was {}", query, exc);
            throw new ServerErrorException("Invalid return type for subscription datafetcher. Was a non-subscription query send to the subscription endpoint?", exc);
        }

        String subscriptionId = queryPayload.getKey().isEmpty() ? UUID.randomUUID().toString() : queryPayload.getKey();
        Flux<ServerSentEvent<String>> resultPublisher = Flux.from(publisher)
                .map(it -> {
                    SSEDataPayload payload = new SSEDataPayload(it.getData(), it.getErrors(), subscriptionId, "SUBSCRIPTION_DATA");
                    try {
                        return ServerSentEvent.builder(mapper.writeValueAsString(payload)).id(UUID.randomUUID().toString()).event(NEXT_EVENT).build();
                    } catch (JsonProcessingException e) {
                        throw new RuntimeException(e);
                    }
                })
                .onErrorResume(exc -> {
                    logger.warn("An exception occurred on subscription {}", subscriptionId, exc);
                    String errorMessage = exc.getMessage() != null ? exc.getMessage() : "An exception occurred";
                    SSEDataPayload payload = new SSEDataPayload(null, Collections.singletonList(new Error(errorMessage)), subscriptionId, "SUBSCRIPTION_DATA");
                    try {
                        return Flux.just(
                                ServerSentEvent.builder(mapper.writeValueAsString(payload)).id(UUID.randomUUID().toString()).event(NEXT_EVENT).build()
                        );
                    } catch (JsonProcessingException e) {
                        throw new RuntimeException(e);
                    }
                });

        Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().unicast().onBackpressureBuffer();
        List<Disposable> disposables = new ArrayList<>();
        Disposable dis = resultPublisher.doOnNext(sink::tryEmitNext)
                .doFinally(signalType -> {
                    sink.tryEmitNext(ServerSentEvent.builder("").id(UUID.randomUUID().toString()).event(COMPLETE_EVENT).build());
                    sink.tryEmitComplete();
                })
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe();
        disposables.add(dis);
        if (pollPeriod != 0) {
            Disposable poller = Flux.interval(Duration.ZERO, Duration.ofMillis(pollPeriod))
                    .map(l -> {
                        sink.tryEmitNext(ServerSentEvent.builder("").data(null).comment("").build());
                        return l;
                    })
                    .subscribeOn(Schedulers.boundedElastic())
                    .subscribe();
            disposables.add(poller);
        }
        return sink.asFlux().doOnNext(it -> logger.info("sending data: {}", it)).doFinally((it) -> disposables.forEach(Disposable::dispose));
    }

    private boolean isSubscriptionQuery(String query) {
        Document document;
        try {
            document = new Parser().parseDocument(query);
        } catch (InvalidSyntaxException exc) {
            return false;
        }
        List<OperationDefinition> definitions = document.getDefinitionsOfType(OperationDefinition.class);
        return !definitions.isEmpty() && definitions.stream().allMatch(def -> def.getOperation() == OperationDefinition.Operation.SUBSCRIPTION);
    }

}
yogeshdengle commented 1 year ago

Thank you, we are planning a Spring boot 3 upgrade and will use it from there.

paulbakker commented 2 months ago

Closing issues that are stale and/or are no longer relevant after the DGS/Spring Graphql integration.