reactor / reactor-netty

TCP/HTTP/UDP/QUIC client/server with Reactor over Netty
https://projectreactor.io
Apache License 2.0
2.54k stars 633 forks source link

Memory is leaking when the response body is not consumed #910

Closed saad14092 closed 4 years ago

saad14092 commented 4 years ago

Expected Behavior

No LEAK error

Actual Behavior

Resource leak error in the logs with no apparent reason

Steps to Reproduce

Issue occurs randomly, no clear way of reproducing

Below is all the code that uses webClient and/or reactor/netty stuff that might cause the issue :

public class InotWebClient {

    private static final Logger LOG = LoggerFactory.getLogger(InotWebClient.class);

    private String authMethod;
    private String authUrl;
    private String refreshtoken;
    private String tenantName;
    private String clientId;
    private String clientPassword;

    private WebClient webClient;    
    private WebClient authWebClient;    
    private String accessToken;

    private String baseUrl;

    public InotWebClient(ReactorClientHttpConnector httpConnector, String baseUrl, String authMethod, String authUrl, String refreshtoken, 
            String tenantName, String clientId, String clientPassword) {    
        this.baseUrl = baseUrl;
        this.authUrl = authUrl;
        this.authMethod = authMethod;
        this.refreshtoken = refreshtoken;
        this.tenantName = tenantName;
        this.clientId = clientId;
        this.clientPassword = clientPassword;

        this.authWebClient = WebClient.builder().clientConnector(httpConnector)
                .defaultHeaders(headers -> {
                    headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
                })          
                .build();

         this.webClient = WebClient.builder().baseUrl(this.baseUrl)
                    .clientConnector(httpConnector)
                    .defaultHeaders(headers -> {
                        headers.add("X-UIPATH-TenantName", this.tenantName);
                        headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE);
                    })
                    .filter((request, next) -> just(bearer(request, this.accessToken)).flatMap(next::exchange))
                    .filter((request, next) -> next.exchange(request).flatMap(
                            clientResponse -> HttpStatus.UNAUTHORIZED.equals(clientResponse.statusCode())
                                              ? refreshToken().map(token -> bearer(request, token)).flatMap(next::exchange)
                                              : just(clientResponse)))
                    .build();
    }

    private Mono<String> refreshToken() {
        Mono<String> refreshTokenMono = Mono.empty();
        if(AuthMethod.oauth.name().equals(this.authMethod)) {
            RefreshToken refreshToken = new RefreshToken(clientId, this.refreshtoken);
            LOG.info("Refreshing OAuth token : {}", refreshToken);
            refreshTokenMono = authWebClient.post().uri(authUrl)
                     .bodyValue(refreshToken)
                     .retrieve()
                     .onStatus(HttpStatus::is4xxClientError, cr -> Mono.error(new InotApiException(REFRESH_TOKEN, "Client Error http status " + cr.rawStatusCode())))
                     .onStatus(HttpStatus::is5xxServerError, cr -> Mono.error(new InotApiException(REFRESH_TOKEN, "Server Error http status " + cr.rawStatusCode())))
                     .bodyToMono(AccessToken.class)
                     .flatMap(at -> {
                         this.accessToken = at.getAccessToken();
                         return just(at.getAccessToken());
                     });
        } else if(AuthMethod.basic.name().equals(this.authMethod)) {
            LOG.info("Getting new authentication token for tenant {}", tenantName);
            BasicAuthRequest basicAuth = new BasicAuthRequest(tenantName, clientId, clientPassword);
            refreshTokenMono = authWebClient.post().uri(authUrl)
                     .bodyValue(basicAuth)
                     .retrieve()
                     .onStatus(HttpStatus::is4xxClientError, cr -> Mono.error(new InotApiException(REFRESH_TOKEN, "Client Error http status " + cr.rawStatusCode())))
                     .onStatus(HttpStatus::is5xxServerError, cr -> Mono.error(new InotApiException(REFRESH_TOKEN, "Server Error http status " + cr.rawStatusCode())))
                     .bodyToMono(BasicAuthResponse.class)
                     .log()
                     .flatMap(at -> {
                         this.accessToken = at.getResult();
                         return just(at.getResult());
                     });
        }
        return refreshTokenMono;
    }

    private ClientRequest bearer(ClientRequest request, String authToken) {
        return ClientRequest.from(request)
                    .headers(headers -> headers.setBearerAuth(authToken))
                    .build();
    }

    public Mono<String> createQueueItem(QueueItem item) {
        return webClient.post()
                 .uri(uriBuilder -> uriBuilder.path("/some/uri").build())
                 .bodyValue(item)
                 .retrieve()
                 .onStatus(HttpStatus::is4xxClientError, cr -> Mono.error(new InotApiException(CREATE_QUEUE_ITEM, item.getItemData().getName(), cr.rawStatusCode())))
                 .onStatus(HttpStatus::is5xxServerError, cr -> Mono.error(new InotApiException(CREATE_QUEUE_ITEM, item.getItemData().getName(), cr.rawStatusCode())))
                 .bodyToMono(String.class)
                 .timeout(Duration.ofMillis(10000))
                 .retry(3, ex -> ex instanceof TimeoutException);
    }

    public Mono<JobResponse> startJob(StartInfo startInfo) {
        return webClient.post()
                 .uri(uriBuilder -> uriBuilder.path("/some/uri").build())
                 .bodyValue(startInfo)
                 .retrieve()
                 .onStatus(HttpStatus::is4xxClientError, cr -> Mono.error(new InotApiException(START_JOB, startInfo.getReleaseKey(), cr.rawStatusCode())))
                 .onStatus(HttpStatus::is5xxServerError, cr -> Mono.error(new InotApiException(START_JOB, startInfo.getReleaseKey(), cr.rawStatusCode())))
                 .bodyToMono(JobResponse.class)
                 .timeout(Duration.ofMillis(10000))
                 .retry(3, ex -> ex instanceof TimeoutException);
    }

}

Your Environment

this is the stacktrace that I'm sure you're familiar with

ERROR io.netty.util.ResourceLeakDetector.reportTracedLeak - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information. Recent access records: Created at: io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:349) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178) io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115) io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:2146) io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1327) io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1227) io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1274) io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:503) io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:442) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:281) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:502) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.base/java.lang.Thread.run(Thread.java:834)

violetagg commented 4 years ago

@saad14092 Please run with -Dio.netty.leakDetection.level=paranoid Also enable logging logging.level.reactor.netty=DEBUG, thus we will have more information in the stack traces with the leaked memory.

saad14092 commented 4 years ago

@violetagg Thank you for your fast reply. I will do as you've requested. It will take some time sincewe're talking about production here, I'll have to do it this evening and check if I got the error tomorrow with better logs.

In the meantime, can you already tell from the code I've submitted what might be the reason of this leak ?

Thank you

violetagg commented 4 years ago

Do you release the response body in the code below when HttpStatus.UNAUTHORIZED.equals(clientResponse.statusCode())

.filter((request, next) -> next.exchange(request).flatMap(
        clientResponse -> HttpStatus.UNAUTHORIZED.equals(clientResponse.statusCode())
                              ? refreshToken().map(token -> bearer(request, token)).flatMap(next::exchange)
                              : just(clientResponse)))

Please take a closer look at the note here https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#webflux-client-exchange

saad14092 commented 4 years ago

@violetagg No, I don't call any type of bodyTo* in this filter, but may I just point out that the "exchange" in my code belongs to the ExchangeFunction interface : org.springframework.web.reactive.function.client.ExchangeFunction.exchange(ClientRequest)

As opposed to the one that the docs mention which belongs to WebClient : org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec.exchange()

violetagg commented 4 years ago

@saad14092 Isn't it an alternative of the one provided by WebClient?

https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/reactive/function/client/ExchangeFunction.html

saad14092 commented 4 years ago

@violetagg Indeed. That's what the documentation says. I'll try and release the body there too, and with some luck I may be able to reproduce the issue in my dev environment.

Thank you for your help. I'll reply once I've got something new

rstoyanchev commented 4 years ago

I think Violeta is correct, assuming the 401 response has a body. As the filter intercepts and swallows the response, replacing it with another, it has to ensure the first response content is consumed. BTW couldn't you simplify the filter logic a little like this:

this.webClient = WebClient.builder().baseUrl(this.baseUrl)
        .clientConnector(httpConnector)
        .defaultHeaders(headers -> {
            headers.add("X-UIPATH-TenantName", this.tenantName);
            headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE);
        })
        .filter((request, next) -> next.exchange(bearer(request, this.accessToken))
                .flatMap(response -> {
                    if (!HttpStatus.UNAUTHORIZED.equals(response.statusCode())) {
                        return just(response);
                    }
                    return response.bodyToMono(Void.class).then(
                            refreshToken().map(token -> bearer(request, token)).flatMap(next::exchange));
                }))
        .build();
saad14092 commented 4 years ago

@rstoyanchev Thank you for the suggestion ! I have succeeded in reproducing the problem once, but the test is not consistent yet. I'll try to do as you've advised and see if it makes a difference.

saad14092 commented 4 years ago

@violetagg @rstoyanchev Thank you guys for your help. Indeed once I've added the bodyToMono(Void.class) to the Filter. The Leak error dissapeared.

This issue should probably be tagged for/stackoverflow since it's not really a bug.

Thank you again

rstoyanchev commented 4 years ago

@saad14092 I was just reminded that we recently added response.releaseBody which you can use instead of bodyToMono(Void.class). The difference is that the former actually consumes and releases any possible data buffers that come through without closing the connection while the latter expects no data to come through and if any does come in, it closes the connection. Unless you expect issues with an unexpected large response, or a response that takes a long time to complete, the former is better since it doesn't force the connection to be closed.