ReactiveX / RxNetty

Reactive Extension (Rx) Adaptor for Netty
Apache License 2.0
1.38k stars 255 forks source link

Spike in Response times : Rxnetty 0.4.x #588

Open JanakiramanChandrasekran opened 7 years ago

JanakiramanChandrasekran commented 7 years ago

I see the spike in response times and its gradually growing. Seems like some settings which i am missing. Can you help me to refer that?

private final HttpClient<ByteBuf, ByteBuf> httpClient;

@Inject
public ReactiveNettyClientImpl(@Value("${gateway.host}") final String host,
        @Value("${gateway.port}") final int port) throws NoSuchAlgorithmException {
    this.httpClient = RxNetty.<ByteBuf, ByteBuf> newHttpClientBuilder(host, port)
            .pipelineConfigurator(
                    new PipelineConfiguratorComposite<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>>(
                            new HttpClientPipelineConfigurator<ByteBuf, ByteBuf>(), gzipPipelineConfigurator))
            .channelOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000)
            .config(new RxClient.ClientConfig.Builder().readTimeout(5000, TimeUnit.MILLISECONDS).build())
            .withSslEngineFactory(new ClientSslFactory(SSLContext.getDefault())).build();
}

/**
 * Configurator so that we can support setting the "Accept-Encoding: gzip, deflate" header.
 */
private PipelineConfigurator<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>> gzipPipelineConfigurator = new PipelineConfigurator<HttpClientResponse<ByteBuf>, HttpClientRequest<ByteBuf>>() {
    public void configureNewPipeline(final ChannelPipeline pipeline) {
        final ChannelHandler handlers = new HttpContentDecompressor();
        pipeline.addLast(handlers);
    }
};

public Observable<Node> xmlWithStatus(final String request, final String uri, final String soapNS) {
    HttpClientRequest<ByteBuf> clientReq = HttpClientRequest.createPost(uri)
            .withHeader("Accept-Encoding", "gzip, deflate").withHeader("Content-Type", "text/xml;charset=UTF-8")
            .withHeader("SOAPAction", soapNS)
            .withHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE).withContent(request);
    return httpClient.submit(clientReq).timeout(5000, TimeUnit.MILLISECONDS).doOnError(error -> {
        System.out.println(error);
    }).flatMap(ResponseHelper.readEntireResponse).doOnError(error -> {
        System.out.println(error);
    }).map(p -> {
        if (p.getLeft().intValue() != 200) {
            throwChassisException(null);
        }
        return toXml(p.getRight());
    });
}

public void shutdown() {
    if (httpClient != null) {
        httpClient.shutdown();
    }
}