eclipse-vertx / vert.x

Vert.x is a tool-kit for building reactive applications on the JVM
http://vertx.io
Other
14.2k stars 2.05k forks source link

JsonParser continuing processing after exception causes issues for RxJava 3 Adapters #4623

Open DemonicTutor opened 1 year ago

DemonicTutor commented 1 year ago

Version

this commit which fixed related issue: https://github.com/eclipse-vertx/vert.x/issues/4338

Context

We use JsonParser with HttpServerRequest bodies to process streams but also with the RxJava Adapters.

I think there are 2 issues present:

  1. the stream wants to cancel / unsubscribe on the first exception (removing the exception handler) this causes a NPE exception here: https://github.com/eclipse-vertx/vert.x/blob/d461719c56f82a4d3c6fd2c6261c650203037ed9/src/main/java/io/vertx/core/parsetools/impl/JsonParserImpl.java#L325-L327

  2. if a exception occurs JsonParser first emits successful events and then the exception including jsonevents AFTER the exception https://github.com/eclipse-vertx/vert.x/blob/d461719c56f82a4d3c6fd2c6261c650203037ed9/src/main/java/io/vertx/core/parsetools/impl/JsonParserImpl.java#L161-L162

I could not write a clean reproducer/test-case for this yet. When consuming a JsonArray containing larger JsonObjects we transform them into DTOs. The json was malformed {goodobj},THISISBAD{goodobj} and instead of the JsonParserException terminating the processing JsonParser emitted only a part of the second {goodobj} leading to a wrong DTO.

instead of emitting the json-event for the entire {goodobj} we just got "someproperty":true from inside said obj

Steps to reproduce

import com.fasterxml.jackson.core.JsonParseException;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.parsetools.JsonParser;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import io.vertx.rxjava3.CompletableHelper;
import io.vertx.rxjava3.core.Vertx;
import io.vertx.rxjava3.core.buffer.Buffer;
import io.vertx.rxjava3.core.http.HttpClient;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

@ExtendWith(VertxExtension.class)
class JsonParserTest {

  private static final HttpServerOptions serverOptions = new HttpServerOptions()
    .setPort(8080)
    .setHost("localhost");
  private static final HttpClientOptions clientOptions = new HttpClientOptions()
    .setDefaultHost("localhost")
    .setDefaultPort(8080);

  private Vertx vertx;
  private HttpClient client;

  @BeforeEach
  void beforeEach(final io.vertx.core.Vertx vertx) {
    this.vertx = Vertx.newInstance(vertx);
    this.client = this.vertx.createHttpClient(clientOptions);
  }

  @Test
  void shouldFailSingleBadJsonEvent(final VertxTestContext context) {
    final var checkpoint = context.checkpoint(2);
    vertx.getOrCreateContext().exceptionHandler(context::failNow);
    rxTest(
      Flowable.fromArray(
        Buffer.buffer("[b]")
      ),
      (status, actual) -> context.verify(() -> {
        Assertions.assertEquals(200, status);
        Assertions.assertEquals(List.of(),
          actual
        );
      }),
      cause -> {
        checkpoint.flag();
        context.verify(() -> Assertions.assertEquals(JsonParseException.class, cause.getClass()));
      }
    )
      .subscribe(CompletableHelper.toObserver(context.succeeding(nothing -> checkpoint.flag())));
  }

  @Test
  void shouldFailBadJsonEventInbetween(final VertxTestContext context) {
    final var checkpoint = context.checkpoint(2);
    vertx.getOrCreateContext().exceptionHandler(context::failNow);
    rxTest(
      Flowable.fromArray(
        Buffer.buffer("["),
        Buffer.buffer("1,"),
        Buffer.buffer("2,"),
        Buffer.buffer("3,"),
        Buffer.buffer("b,"),
        Buffer.buffer("4,"),
        Buffer.buffer("5,"),
        Buffer.buffer("6,"),
        Buffer.buffer("7,"),
        Buffer.buffer("8,"),
        Buffer.buffer("9,"),
        Buffer.buffer("]")
      ),
      (status, actual) -> context.verify(() -> {
        Assertions.assertEquals(200, status);
        Assertions.assertEquals(List.of(
            Buffer.buffer("["),
            Buffer.buffer("1"),
            Buffer.buffer("2"),
            Buffer.buffer("3")
          ),
          actual
        );
      }),
      cause -> {
        checkpoint.flag();
        context.verify(() -> Assertions.assertEquals(JsonParseException.class, cause.getClass()));
      }
    )
      .subscribe(CompletableHelper.toObserver(context.succeeding(nothing -> checkpoint.flag())));
  }

  private Completable rxTest(final Flowable<Buffer> given,
                             final BiConsumer<Integer, List<Buffer>> actual,
                             final Consumer<Throwable> exception) {
    return Completable.concatArray(
      vertx.createHttpServer(serverOptions)
        .requestHandler(
          request -> io.vertx.rxjava3.core.parsetools.JsonParser.newInstance(JsonParser.newParser(request.getDelegate()))
            .toFlowable()
            .doOnSubscribe(dis -> request.getDelegate().response().setChunked(true))
            .map(event -> switch (event.type()) {
              case START_OBJECT -> Buffer.buffer("{");
              case END_OBJECT -> Buffer.buffer("}");
              case START_ARRAY -> Buffer.buffer("[");
              case END_ARRAY -> Buffer.buffer("]");
              case VALUE -> Buffer.buffer(event.value().toString());
            })
            .concatMapCompletable(event -> request.response().rxWrite(event))
            .doOnError(exception::accept)
            .onErrorComplete()
            .andThen(request.response().rxEnd())
            .subscribe()
        )
        .rxListen()
        .ignoreElement(),
      client.rxRequest(HttpMethod.POST, "/path")
        .flatMap(request -> request.rxSend(given))
        .flatMap(response -> response.toObservable().toList().doOnSuccess(body -> actual.accept(response.statusCode(), body)))
        .ignoreElement()
    );
  }
}

Extra

DemonicTutor commented 1 year ago

I managed to reproduce the behavior and i would not expect {"e":false} to be emitted.

But this also seems to be a issue with Jackson's Parser that acts strangely for this input.

Expected :<[{"a":1,"b":[],"c":{"e":false}}, {"a":1,"b":[],"c":{"e":false}}, {"a":1,"b":[],"c":{"e":false}}, {"a":1,"b":[],"c":{"e":false}}]>
Actual   :<[{"a":1,"b":[],"c":{"e":false}}, {"a":1,"b":[],"c":{"e":false}}, {"a":1,"b":[],"c":{"e":false}}, {"a":1,"b":[],"c":{"e":false}}, {"e":false}]>
import com.fasterxml.jackson.core.JsonParseException;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.parsetools.JsonEvent;
import io.vertx.core.parsetools.JsonParser;
import io.vertx.core.streams.ReadStream;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import io.vertx.rxjava3.CompletableHelper;
import io.vertx.rxjava3.FlowableHelper;
import io.vertx.rxjava3.core.Vertx;
import io.vertx.rxjava3.core.buffer.Buffer;
import io.vertx.rxjava3.core.http.HttpClient;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.List;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

@ExtendWith(VertxExtension.class)
class JsonParserTest {

  private static final HttpServerOptions serverOptions = new HttpServerOptions()
    .setPort(8080)
    .setHost("localhost");
  private static final HttpClientOptions clientOptions = new HttpClientOptions()
    .setDefaultHost("localhost")
    .setDefaultPort(8080);

  private Vertx vertx;
  private HttpClient client;

  @BeforeEach
  void beforeEach(final io.vertx.core.Vertx vertx) {
    this.vertx = Vertx.newInstance(vertx);
    this.client = this.vertx.createHttpClient(clientOptions);
  }

  @Test
  void shouldFailBadJsonEventInbetween(final VertxTestContext context) {
    final var checkpoint = context.checkpoint(2);
    vertx.getOrCreateContext().exceptionHandler(context::failNow);
    rxTest(
      Flowable.fromArray(
        Buffer.buffer("["),
        Buffer.buffer("{\"a\":1,\"b\":[],\"c\":{\"e\":false}}"),
        Buffer.buffer(",{\"a\":1,\"b\":[],\"c\":{\"e\":false}}"),
        Buffer.buffer(",{\"a\":1,\"b\":[],\"c\":{\"e\":false}}"),
        Buffer.buffer(",{\"a\":1,\"b\":[],\"c\":{\"e\":false}}"),
        Buffer.buffer(",MALFORMED{\"a\":1,\"b\":[],\"c\":{\"e\":false}}"),
        Buffer.buffer("]")
      ),
      (status, actual) -> context.verify(() -> {
        Assertions.assertEquals(200, status);
        Assertions.assertEquals(List.of(
            Buffer.buffer("{\"a\":1,\"b\":[],\"c\":{\"e\":false}}"),
            Buffer.buffer("{\"a\":1,\"b\":[],\"c\":{\"e\":false}}"),
            Buffer.buffer("{\"a\":1,\"b\":[],\"c\":{\"e\":false}}"),
            Buffer.buffer("{\"a\":1,\"b\":[],\"c\":{\"e\":false}}")
            //, Buffer.buffer("{\"e\":false}")
          ),
          actual
        );
      }),
      cause -> {
        checkpoint.flag();
        context.verify(() -> Assertions.assertEquals(JsonParseException.class, cause.getClass()));
      }
    )
      .subscribe(CompletableHelper.toObserver(context.succeeding(nothing -> checkpoint.flag())));
  }

  private Completable rxTest(final Flowable<Buffer> given,
                             final BiConsumer<Integer, List<Buffer>> actual,
                             final Consumer<Throwable> exception) {
    final class BugfixJsonParserAdapter implements ReadStream<JsonEvent> {

      private final ReadStream<JsonEvent> upstream;

      private BugfixJsonParserAdapter(final ReadStream<io.vertx.core.parsetools.JsonEvent> upstream) {
        this.upstream = upstream;
      }

      @Override
      public ReadStream<JsonEvent> exceptionHandler(final Handler<Throwable> handler) {
        // FIXME: java.lang.NullPointerException: Cannot invoke "io.vertx.core.Handler.handle(Object)" because "this.exceptionHandler" is null
        upstream.exceptionHandler(Objects.requireNonNullElse(handler, cause -> { }));
        return this;
      }

      @Override
      public ReadStream<JsonEvent> handler(final Handler<JsonEvent> handler) {
        upstream.handler(handler);
        return this;
      }

      @Override
      public ReadStream<JsonEvent> endHandler(final Handler<Void> handler) {
        upstream.endHandler(handler);
        return this;
      }

      @Override
      public ReadStream<JsonEvent> pause() {
        upstream.pause();
        return this;
      }

      @Override
      public ReadStream<JsonEvent> resume() {
        upstream.resume();
        return this;
      }

      @Override
      public ReadStream<JsonEvent> fetch(long amount) {
        upstream.fetch(amount);
        return this;
      }
    }
    return Completable.concatArray(
      vertx.createHttpServer(serverOptions)
        .requestHandler(
          request -> FlowableHelper.toFlowable(new BugfixJsonParserAdapter(JsonParser.newParser(request.getDelegate()).objectValueMode()))
            .doOnSubscribe(dis -> request.getDelegate().response().setChunked(true))
            .filter(JsonEvent::isObject)
            .map(event -> switch (event.type()) {
              case START_OBJECT -> Buffer.buffer("{");
              case END_OBJECT -> Buffer.buffer("}");
              case START_ARRAY -> Buffer.buffer("[");
              case END_ARRAY -> Buffer.buffer("]");
              case VALUE -> Buffer.buffer(event.value().toString());
            })
            .concatMapCompletable(event -> request.response().rxWrite(event))
            .doOnError(exception::accept)
            .onErrorComplete()
            .andThen(request.response().rxEnd())
            .subscribe()
        )
        .rxListen()
        .ignoreElement(),
      client.rxRequest(HttpMethod.POST, "/path")
        .flatMap(request -> request.rxSend(given))
        .flatMap(response -> response.toObservable().toList().doOnSuccess(body -> actual.accept(response.statusCode(), body)))
        .ignoreElement()
    );
  }
}
vietj commented 1 year ago

exception handler does not seem fatal here, since the parser can recover

vietj commented 1 year ago

I think the reproducers don't need HTTP server ?

DemonicTutor commented 1 year ago

what do you mean regarding exception handler?

if there are 2 exceptions for example the first one thrown will cause the ReadStreamSubscriber to remove the ExceptionHandler Then the second exception will cause a NPE because now the exceptionHandler is null.

yeah - i just used it to get a ReadStream