eclipse-vertx / vert.x

Vert.x is a tool-kit for building reactive applications on the JVM
http://vertx.io
Other
14.31k stars 2.08k forks source link

"java.lang.IllegalStateException: Request has already been read" when call pump.start for HttpServerRequest #4547

Closed shuitai closed 1 year ago

shuitai commented 1 year ago

`package vertx;

import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.http.*; import io.vertx.core.streams.Pump; import io.vertx.ext.web.Router; import io.vertx.ext.web.handler.LoggerFormat; import io.vertx.ext.web.handler.LoggerHandler; import lombok.extern.slf4j.Slf4j;

import java.net.MalformedURLException;

@Slf4j public class VertxHttpClient {

private static Vertx vertx = Vertx.vertx();

public static Future<HttpServer> createServer(Router router) {
    Promise<HttpServer> promise = Promise.promise();
    HttpServerOptions options = new HttpServerOptions().setLogActivity(true);
    vertx.createHttpServer(options).requestHandler(router).listen(8888, "0.0.0.0", promise);
    log.info("createServer!");
    return promise.future();
}

public static void main(String[] args) throws MalformedURLException {
    HttpClientOptions options = new HttpClientOptions().setLogActivity(true);
    HttpClient httpClient = vertx.createHttpClient(options);
    Router router = Router.router(vertx);
    router.route().handler(LoggerHandler.create(LoggerFormat.SHORT));
    router.get("/indicators").handler(ctx -> {
        HttpServerRequest rctRequest = ctx.request();
        HttpServerResponse rctResponse = ctx.response();

        RequestOptions requestOptions = new RequestOptions();
        requestOptions.setMethod(HttpMethod.GET).setAbsoluteURI("http://10.157.66.69:3030/indicators");
        // rctRequest.pause();
        httpClient.request(requestOptions).onSuccess(request -> {
            request.response().onSuccess(r -> {
                r.body().onSuccess(b -> rctResponse.end(b));
            }).onFailure(err -> {
                log.info("fail1: " + err.toString());
                rctResponse.setStatusCode(500);
                rctResponse.end();
            });

            // rctRequest.resume();
            request.setChunked(true);
            Pump reqPump = Pump.pump(rctRequest, request);
            reqPump.start();

            rctRequest.exceptionHandler(e -> {
                log.error("exception:" + e);
                reqPump.stop();
                request.end();
            });

            rctRequest.endHandler(end -> {
                log.error("rctRequest ended1!");
                request.end();
            });
        }).onFailure(err -> {
            // rctRequest.resume();
            log.info("fail2: " + err.toString());
        });
    });

    Future<HttpServer> httpServerFuture = createServer(router);

    httpServerFuture.onComplete(r -> {
        log.info("http start successful!");
    }).onFailure(err -> {
        log.info("http start failure!");
    });
}

} `

=============

previously, Vertx3 doesn't have the issue, but upgrade to vertx4, the issue happens.

java.lang.IllegalStateException: Request has already been read at io.vertx.core.http.impl.Http1xServerRequest.checkEnded(Http1xServerRequest.java:655) ~[vertx-core-4.1.8.jar:4.1.8] at io.vertx.core.http.impl.Http1xServerRequest.handler(Http1xServerRequest.java:293) ~[vertx-core-4.1.8.jar:4.1.8] at io.vertx.ext.web.impl.HttpServerRequestWrapper.handler(HttpServerRequestWrapper.java:103) ~[vertx-web-4.1.8.jar:4.1.8] at io.vertx.ext.web.impl.HttpServerRequestWrapper.handler(HttpServerRequestWrapper.java:22) ~[vertx-web-4.1.8.jar:4.1.8] at io.vertx.core.streams.impl.PumpImpl.start(PumpImpl.java:86) ~[vertx-core-4.1.8.jar:4.1.8] at io.vertx.core.streams.impl.PumpImpl.start(PumpImpl.java:39) ~[vertx-core-4.1.8.jar:4.1.8] at vertx.VertxHttpClient.lambda$null$5(VertxHttpClient.java:52) ~[classes/:?] at io.vertx.core.impl.future.FutureImpl$1.onSuccess(FutureImpl.java:91) ~[vertx-core-4.1.8.jar:4.1.8]

if add pause and resume, it could work, but the performance is worse than vertx3 without pause and resume. So what is the problem? or is there better way to implement the function?

shuitai commented 1 year ago

@tobias @lance @pledbrook @adietish @vietj

Who could help me?

tsegismont commented 1 year ago

Indeed, you must pause the request before doing an async call otherwise the request body will be fully read before you create the pump.

In Vert.x 4 you cand use pipe on a ReadStream instead of creating a Pump.

What do you mean with performance is worse?

shuitai commented 1 year ago

@tsegismont

  1. This change doesn't highlight in vertx3 to 4 migration document.
  2. In our api testing with Jmeter, the TPS and throughput are worse than vertx3 (about 10%).