graphql-java / graphql-java

GraphQL Java implementation
https://graphql-java.com
MIT License
6.12k stars 1.12k forks source link

CompletionStageMappingPublisher is swallowing Jackson serialization errors #3326

Open grodriguezatlassian opened 1 year ago

grodriguezatlassian commented 1 year ago

Describe the bug We encountered a bug that prevented submitting subscription data when the data could not be correctly serialized. This error was logged when we increased logging in another package. Upon stepping through the code, we found that the CompletionStageMappingPublisher only captures RuntimeExceptions, but more types of exceptions should be caught and handled.

To Reproduce This package is a test application that contains a demo subscriber that generates exceptions to websockets.

subscription-test (1).zip

Here's the TestFetcher from that package. It creates exceptions by submitting an unsupported date format. This demonstrates the issue, which is linked to an older version of graphql-java but is present in the last version as well.

package subscriptiontest.datafetcher;

import com.netflix.graphql.dgs.DgsComponent;
import com.netflix.graphql.dgs.DgsQuery;
import com.netflix.graphql.dgs.DgsSubscription;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.observables.ConnectableObservable;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Flow;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@DgsComponent
@Slf4j
@Component
public class TestDataFetcher implements ObservableOnSubscribe<GQLData> {

    private final Flowable<GQLData> publisher;
    private ObservableEmitter<GQLData> emitter;

    public TestDataFetcher() {
        final Observable<GQLData> gqlDataObservable = Observable.create(this);
        final ConnectableObservable<GQLData> connectableObservable = gqlDataObservable.share().publish();
        connectableObservable.connect();
        publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
    }

    @Override
    public void subscribe(@NotNull ObservableEmitter<GQLData> observableEmitter) {
        emitter = observableEmitter;
    }

    @DgsQuery
    public GQLData gqldata() {
        GQLData gqlData = new GQLData("Here's data", new HashMap<>());
        Map fields = gqlData.getFields();
        fields.put("date", OffsetDateTime.now());
        return gqlData;
    }

    @DgsSubscription
    public Publisher<GQLData> gqlDataSub() {
        return publisher
                .map(filter -> {
                    log.info("Publishing new data");
                    GQLData gqlData = new GQLData("Here's subscription data", new HashMap<>());
                    Map fields = gqlData.getFields();
                    fields.put("date", OffsetDateTime.now());
                    return gqlData;
                })
                .doOnTerminate(() -> log.info("doOnTerminate"))
                .doOnComplete(() -> log.info("doOnComplete"))
                .doAfterTerminate(() -> log.info("doAfterTerminate"))
                .doOnError(e -> log.error("doOnError {}", e.getMessage(), e))
                .doFinally(() -> log.info("doFinally"))
                .doOnCancel(() -> log.info("doOnCancel"));
    }

    @Scheduled(fixedRate = 15 * 1000)
    public void emitSubData() {
        log.info("Emitting data");
        if (emitter != null) {
            emitter.onNext(GQLData.builder().build());
        }
    }

}

Here's what we think the problematic code in CompletionStageMappingPublisher is

        public void onNext(U u) {
            if (!this.onCompleteOrErrorRunCalled.get()) {
                try {
                    CompletionStage<D> completionStage = (CompletionStage)CompletionStageMappingPublisher.this.mapper.apply(u);
                    this.offerToInFlightQ(completionStage);
                    completionStage.whenComplete(this.whenNextFinished(completionStage));
               //Jackson Exceptions can be IOException or RuntimeException and these do not overlap
                } catch (RuntimeException var3) {
                    this.handleThrowable(var3);
                }

            }
        }

Here is the exception we encountered, which is of parent type IOException. InvalidDefinitionException

Here's is the stacktrack example to show how graphql-java and dgs-graphql-websockets interact.

_writeValueAndClose:4721, ObjectMapper (com.fasterxml.jackson.databind)
writeValueAsBytes:3987, ObjectMapper (com.fasterxml.jackson.databind)
onNext:180, WebsocketGraphQLTransportWSProtocolHandler$handleSubscription$1 (com.netflix.graphql.dgs.subscriptions.websockets)
onNext:167, WebsocketGraphQLTransportWSProtocolHandler$handleSubscription$1 (com.netflix.graphql.dgs.subscriptions.websockets)
lambda$whenNextFinished$0:97, CompletionStageMappingPublisher$CompletionStageSubscriber (graphql.execution.reactive)
accept:-1, CompletionStageMappingPublisher$CompletionStageSubscriber$$Lambda$6240/0x0000000801d0d608 (graphql.execution.reactive)
uniWhenComplete:863, CompletableFuture (java.util.concurrent)
uniWhenCompleteStage:887, CompletableFuture (java.util.concurrent)
whenComplete:2325, CompletableFuture (java.util.concurrent)
whenComplete:144, CompletableFuture (java.util.concurrent)
onNext:85, CompletionStageMappingPublisher$CompletionStageSubscriber (graphql.execution.reactive)
onNext:46, HalfSerializer (io.reactivex.rxjava3.internal.util)
onNext:97, StrictSubscriber (io.reactivex.rxjava3.internal.subscribers)
onNext:69, FlowableMap$MapSubscriber (io.reactivex.rxjava3.internal.operators.flowable)
onNext:80, FlowableDoOnLifecycle$SubscriptionLambdaSubscriber (io.reactivex.rxjava3.internal.operators.flowable)
onNext:80, FlowableDoOnLifecycle$SubscriptionLambdaSubscriber (io.reactivex.rxjava3.internal.operators.flowable)
tryOnNext:75, FlowableFilter$FilterSubscriber (io.reactivex.rxjava3.internal.operators.flowable)
onNext:53, FlowableFilter$FilterSubscriber (io.reactivex.rxjava3.internal.operators.flowable)
drain:186, FlowableOnBackpressureBuffer$BackpressureBufferSubscriber (io.reactivex.rxjava3.internal.operators.flowable)
onNext:111, FlowableOnBackpressureBuffer$BackpressureBufferSubscriber (io.reactivex.rxjava3.internal.operators.flowable)
onNext:56, FlowableFromObservable$SubscriberObserver (io.reactivex.rxjava3.internal.operators.flowable)
onNext:180, ObservablePublish$PublishConnection (io.reactivex.rxjava3.internal.operators.observable)
onNext:200, ObservableRefCount$RefCountObserver (io.reactivex.rxjava3.internal.operators.observable)
onNext:180, ObservablePublish$PublishConnection (io.reactivex.rxjava3.internal.operators.observable)
onNext:101, ObservableDoOnEach$DoOnEachObserver (io.reactivex.rxjava3.internal.operators.observable)
onNext:67, ObservableCreate$CreateEmitter (io.reactivex.rxjava3.internal.operators.observable)
onNext:171, ObservableCreate$SerializedEmitter (io.reactivex.rxjava3.internal.operators.observable)
notifySubscribers:220, QueueSubscriptionService (com.service.messaging)
github-actions[bot] commented 11 months ago

Hello, this issue has been inactive for 60 days, so we're marking it as stale. If you would like to continue this discussion, please comment within the next 30 days or we'll close the issue.

github-actions[bot] commented 10 months ago

Hello, as this issue has been inactive for 90 days, we're closing the issue. If you would like to resume the discussion, please create a new issue.

dondonz commented 10 months ago

Reopening because it's worth investigating