reactor / reactor-netty

TCP/HTTP/UDP/QUIC client/server with Reactor over Netty
https://projectreactor.io
Apache License 2.0
2.56k stars 638 forks source link

Downloading large file causes memory leak in Linux but not in Windows #1757

Closed walfen closed 3 years ago

walfen commented 3 years ago

Expected Behavior

Downloading files shouldn't cause memory leaks in Linux

Actual Behavior

Downloading large files (200MB, 500MB, 2GB) in a Spring Boot application is working properly in Windows machine. However it doesn't work in Linux with the same configuration parameters. It's important to notice that if logs (response bodies) are enabled the error dissapears in Linux machine too. It seems to be related to the speed of processing the response content.

Steps to Reproduce


    private void download(String url, Path filePath) {

        Flux<DataBuffer> dataBuffers = this.getWebClient()
            .get()
            .uri(uriBuilder -> uriBuilder.path(url).build())
            .retrieve()
            .bodyToFlux(DataBuffer.class);

        DataBufferUtils.write(dataBuffers, filePath, StandardOpenOption.CREATE_NEW).block();
    }

Response body logs are enabled in the following way:


customWebClientBuilder.clientConnector(this.createLogConnector());

private ClientHttpConnector createLogConnector() {
    HttpClient httpClient = HttpClient.create().wiretap(this.getClass().getCanonicalName(), LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL);
    return new ReactorClientHttpConnector(httpClient);
}

The error logs:


2021-07-29 08:53:01,884 [reactor-http-nio-3] ERROR reactor.core.publisher.Operators.error:314 - Operator called default onErrorDropped
reactor.netty.ReactorNetty$InternalNettyException: java.lang.OutOfMemoryError: Java heap space
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ Body from GET https://xxxxx [DefaultClientResponse]
    |_ checkpoint ⇢ Body from GET https://xxxxx [DefaultClientResponse]
Stack trace:
Caused by: java.lang.OutOfMemoryError: Java heap space

Your Environment

Windows

Linux (AWS)

violetagg commented 3 years ago

@walfen Are you able to reproduce that with vanilla Reactor Netty?

walfen commented 3 years ago

No, I have only tried it with Spring Boot.

violetagg commented 3 years ago

@walfen Do you have configuration for switching from direct memory to heap or you are using everything as it is by default?

bclozel commented 3 years ago

As described in the Javadoc, the DataBufferUtils#write do not release the DataBuffer instances.

Something like this could solve the problem:

DataBufferUtils.write(dataBuffers, filePath, StandardOpenOption.CREATE_NEW).map(DataBufferUtils:::release).then().block();

I'm not sure why in some cases the out of memory error happens/doesn't/earlier/later, but I think that configuring the Netty leak detector would trigger interesting warnings.

walfen commented 3 years ago

Thank you for your answers.

@violetagg I use everyhting as default. @bclozel I'm going to enable the Netty leak detector and I will try the DataBufferUtils::release

walfen commented 3 years ago

I'm analyzing the code of the DataBufferUtils. write(Publisher source, Path destination, OpenOption... options) and it seems that already releases the data buffers:


public static Mono<Void> write(Publisher<DataBuffer> source, Path destination, OpenOption... options) {
    Assert.notNull(source, "Source must not be null");
    Assert.notNull(destination, "Destination must not be null");

    Set<OpenOption> optionSet = checkWriteOptions(options);

    return Mono.create(sink -> {
        try {
            AsynchronousFileChannel channel = AsynchronousFileChannel.open(destination, optionSet, null);
            sink.onDispose(() -> closeChannel(channel));
            write(source, channel).subscribe(DataBufferUtils::release,
                    sink::error,
                    sink::success);
        }
        catch (IOException ex) {
            sink.error(ex);
        }
    });
}
bclozel commented 3 years ago

@walfen Sorry, I looked at the wrong implementation variant. You're right this one releases the buffers.

walfen commented 3 years ago

I have incresead the level of the logs and I can see this error:

2021-07-29 13:05:16,013 [reactor-http-nio-3] DEBUG reactor.netty.channel.FluxReceive.debug:249 - [id:67121406-1, L:/172.31.35.81:46376 - R:developer.api.autodesk.com/34.252.150.18:443] FluxReceive{pending=11, cancelled=false, inboundDone=true, inboundError=reactor.netty.ReactorNetty$InternalNettyException: java.lang.OutOfMemoryError: Java heap space}: Only one connection receive subscriber allowed.

bclozel commented 3 years ago

So to summarize, sommething is allocating a lot on on the heap and somehow slowing down the process leaves enough breathing room to the GC to do its work. This is the first time I'm seeing such a pattern so this doesn't ring a bell on the Spring Boot side.

Maybe the best call here would be to connect a profiler to the VM and get a memory snapshot when the app is dealing with traffic. You should see there what's allocating so much.

walfen commented 3 years ago

@bclozel Thank you for your answer. I have done more tests and now it also fails with the response body debug enabled. I have attached some images showing the memory snapshot (1 - before upload, 2 - during/after upload, 3 - objects). The memory usage seems to be always lower than the maximum but the byte arrays increase very fast during the upload.

start downloading downloading-objects

The application.properties has the following properties. Maybe there is some misconfiguration that causes the problem.


spring:
  servlet:
    multipart:
      max-file-size: 100MB
      max-request-size: 200MB
  codec:
    max-in-memory-size: 10MB

On the other hand, two months ago we changed the AWS instances from t3 (x86_64) to t4g (arm64) and the applications needed a lot of time for starting and we have to install Haveged (more info in https://stackoverflow.com/questions/7554831/faster-random-generator-in-tomcat-7), Haveged solved the problem. Is it possible that exists some kind of incompatibility between this kind of Linux instances and Netty/Reactor Netty?

bclozel commented 3 years ago

I'm not aware of any incompatibility here.

I think we've extended the scope of this issue way too much. Unless you've configured a custom DataBufferFactory in your application, Netty or Reactor Netty do not allocate buffers on the heap, so they should be out of the picture here. With that in mind, I think @violetagg can close this issue.

This seems to be a broader performance issue in your application, with lots of moving pieces involved. I guess your next step is to track down what's allocating all those HashMaps on the heap. We initially only had WebClient and files on disk in the context of this issue, and now things seem to point to the Servlet application parsing large multipart requests and possibly as objects on the heap?

Your initial investigation may have pointed to WebClient and Reactor Netty, but at this point it might just be that you found a way to limit its speed and let the GC catch up with the allocation. Please reach out on this issue if you've found that the allocation happens in Reactor Netty - otherwise I'm afraid we can't spend much time helping on this performance problem.

Thanks!

violetagg commented 3 years ago

@walfen You can safely ignore io.netty.buffer.PoolChunk, Reactor Netty/Netty uses memory pooling by default. More information here: https://github.com/netty/netty/issues/10717

walfen commented 3 years ago

@bclozel @violetagg Thank you again for your replies and sorry for continuing with this topic but I have simplified the application to a single API endpoint that receives a very simple JSON, downloads a file from a remote API and stores it in the temp folder and I still have the error in Linux. I can't see anything in my code that could cause this error.

Here you can see the controller. This is the only entry point of the application (no other process is initiated except a request to obtain an access token that happens when the application is started):

Controller

import javax.annotation.Resource;

import org.modelmapper.ModelMapper;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.security.access.annotation.Secured;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

import com.xxx.yyy.sync.controllers.dtos.CopyDto;
import com.xxx.yyy.sync.controllers.helpers.CreateOperation;
import com.xxx.yyy.sync.entities.Copy;
import com.xxx.yyy.sync.entities.types.SyncStatusType;
import com.xxx.yyy.sync.services.FileService;

@RestController
@RequestMapping("/files")
public class FileController {

    @Resource
    private ModelMapper mapper;

    @Resource
    private FileService fileService;

    @Secured("ROLE_APPLICATION")
    @ResponseStatus(HttpStatus.CREATED)
    @RequestMapping(path = "/copies", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
    public SyncStatusType copy(@RequestBody @Validated(CreateOperation.class) CopyDto copyDto) {
        Copy copy = mapper.map(copyDto, Copy.class);

        return fileService.copy(copy);
    }

}

The service where the error happens:

Service


import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.UUID;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.stereotype.Service;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;

import com.xxx.yyy.sync.entities.Copy;
import com.xxx.yyy.sync.entities.types.SyncStatusType;
import com.xxx.yyy.sync.security.AuthService;
import com.xxx.yyy.sync.services.Bim360Service;
import com.xxx.yyy.sync.services.FileService;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class FileServiceImpl implements FileService {

    private static final Logger LOGGER = LoggerFactory.getLogger(FileServiceImpl.class);

    @Resource
    private AuthService authService;

    @Resource
    private Bim360Service bim360Service;

    @Override
    public SyncStatusType copy(Copy copy) {
        LOGGER.info("{}: copy file in folder - sourceFileId: {}, targetFolderId: {},", authService.getLogId(), copy.getSource().getId(), copy.getTarget().getFolderId());

        String bim360AccessToken = bim360Service.getAccessToken(authService.getPrincipal()); // get access token from principal
        UriComponents storageUri = UriComponentsBuilder.fromUriString(copy.getSource().getStorageLink()).build();
        Path tempFilePath = Path.of(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());

        Flux<DataBuffer> dataBuffers = bim360Service.getWebClient()
            .get()
            .uri(uriBuilder -> uriBuilder
                .path(storageUri.getPath())
                .queryParams(storageUri.getQueryParams())
                .build())
            .headers(h -> h.setBearerAuth(bim360AccessToken))
            .retrieve()
            .bodyToFlux(DataBuffer.class);

        DataBufferUtils.write(dataBuffers, tempFilePath, StandardOpenOption.CREATE_NEW).then(Mono.just(tempFilePath)).block();

        return SyncStatusType.SUCCESSFUL;
    }

}

I have tested with files of 200 and 500mb and the error is still there:


Caused by: java.lang.OutOfMemoryError: Java heap space
2021-07-29 18:29:18,471 [reactor-http-nio-1] WARN  io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught:311 - An exception 'java.lang.OutOfMemoryError: Java heap space' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
java.lang.OutOfMemoryError: Java heap space
2021-07-29 18:29:25,882 [http-nio-80-exec-1] INFO  com.xxx.yyy.sync.services.impl.FileServiceImpl.copy:55 - preconnector-api|null: copy file in folder - sourceFileId: urn:adsk.wipprod:dm.lineage:sFXPdkpFTiellVwEYhMrRA, targetFolderId: urn:adsk.wipprod:fs.folder:co.UfkT9spuS4KI6rDjOSG-Fw,
2021-07-29 18:29:28,355 [reactor-http-nio-2] ERROR reactor.core.publisher.Operators.error:314 - Operator called default onErrorDropped
reactor.netty.ReactorNetty$InternalNettyException: java.lang.OutOfMemoryError: Java heap space
        Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
        |_ checkpoint ⇢ Body from GET https://developer.api.autodesk.com/oss/v2/buckets/wip.dm.prod/objects/4af05896-f0e5-498d-8c41-966b0a52fd04.txt?scopes=b360project.c5b923cf-abf6-435b-9427-c69a1d363195,O2tenant.14805406 [DefaultClientResponse]
        |_ checkpoint ⇢ Body from GET https://developer.api.autodesk.com/oss/v2/buckets/wip.dm.prod/objects/4af05896-f0e5-498d-8c41-966b0a52fd04.txt?scopes=b360project.c5b923cf-abf6-435b-9427-c69a1d363195,O2tenant.14805406 [DefaultClientResponse]
Stack trace:
        Suppressed: java.lang.Exception: #block terminated with an error
                at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
                at reactor.core.publisher.Mono.block(Mono.java:1703)
violetagg commented 3 years ago

@walfen Can you enable the DEBUG for this logger so that we can see the stack

WARN  io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught:311 - An exception 'java.lang.OutOfMemoryError: Java heap space' [enable DEBUG level for full stacktrace]
bclozel commented 3 years ago

I've tried with a reduced sample like this on macOS and had no issue with a 1GB file.

@Component
public class DownloadRunner implements ApplicationRunner {

    private final WebClient webclient;

    private Log logger = LogFactory.getLog(DownloadRunner.class);

    public DownloadRunner(WebClient.Builder builder) {
        this.webclient = builder.build();
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        Path tempFilePath = Path.of(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
        logger.info("path is:" + tempFilePath);
        Flux<DataBuffer> dataBuffers = this.webclient
                .get()
                .uri("http://example.org/large-file.zip")
                .retrieve()
                .bodyToFlux(DataBuffer.class);

        DataBufferUtils.write(dataBuffers, tempFilePath, StandardOpenOption.CREATE_NEW).block();
    }
}
walfen commented 3 years ago

I have tried your component DownloadRunner with the url http://ipv4.download.thinkbroadband.com/512MB.zip and it's also working in Windows and Linux.

Then, In my code I have replaced the API url that I was calling with the http://ipv4.download.thinkbroadband.com/512MB.zip and it's also working in Windows and Linux.

However, I still have the problem when I call the original API url. It works in Windows but it fails in Linux with the memory exception. So it seems that the error only happens calling this API in Linux. The API endpoint is https://forge.autodesk.com/en/docs/data/v2/reference/http/buckets-:bucketKey-objects-:objectName-GET/

I don't know why it works in Windows and not in Linux. And I don't know why it throws a memory exception. But from my side, you can close this issue because I think it's a very rare case. I will call the API endpoint in small chunks and I hope it will work, I wanted to avoid this solution because this API has a limit in the allowed number of API request per minute.

@bclozel @violetagg Thank you very much for all your help.

bclozel commented 3 years ago

Wait, that's a problem. Can you help us craft a minimal sample with the code I provided but using that URL? Do we need credentials to that service or some special setup?

If this fails with that URL only there might be an issue with HTTP parsing or something very specific in Netty, Reactor Netty or Spring.

walfen commented 3 years ago

Yes, you need credentials to access that service. You have to create an account and follow all the configuration process. In the following link, you have a tutorial for all the process (it's long and complex): https://forge.autodesk.com/en/docs/data/v2/tutorials/download-file/

Regarding the sample. Below you can see a basic sample (it can have some errors because I have taken code from different places):

Download file


...

@Service
public class FileServiceImpl implements FileService {

        private final WebClient webclient;

    public FileServiceImpl(WebClient.Builder builder) {
        this.webclient = builder.baseUrl("https://developer.api.autodesk.com").build();
    }

    @Override
        // storage url is retrived from the BIM360 API (see tutorial)
    public boolean download(String storageUrl) {

               // get access token 
               String accessToken = this.getAccessToken();

        UriComponents storageUri = UriComponentsBuilder.fromUriString(storageUrl).build();
        Path tempFilePath = Path.of(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());

        Flux<DataBuffer> dataBuffers = this.webclient
            .get()
            .uri(uriBuilder -> uriBuilder
                .path(storageUri.getPath())
                .queryParams(storageUri.getQueryParams())
                .build())
            .headers(h -> generateBearer(accessToken))
            .retrieve()
            .bodyToFlux(DataBuffer.class);

        DataBufferUtils.write(dataBuffers, tempFilePath, StandardOpenOption.CREATE_NEW).then(Mono.just(tempFilePath)).block();

        return true;
    }

       private String getAccessToken() {
        return webclient 
                .post()
            .uri("/authentication/v1/authenticate")
                        .headers(h -> h.setContentType(MediaType.APPLICATION_FORM_URLENCODED))
            .body(BodyInserters
                .fromFormData("client_id", XXX)
                .with("client_secret", YYY)
                .with("grant_type", "client_credentials")
                .with("scope", "data:read data:create data:write account:read account:write user-profile:read user:read user:write"))
            .retrieve()
            .bodyToMono(Bim360AccessTokenDto.class)
                        .map(Bim360AccessTokenDto::getAccessToken)
            .block();
    }

}

It's failing in a AWS EC2 instance of this type: ubuntu-focal-20.04-arm64-server-20210429

violetagg commented 3 years ago

@walfen @bclozel How should we proceed here? We need a complete minimal sample that reproduces the problem (with vanilla Reactor Netty if possible).

bclozel commented 3 years ago

@violetagg as @walfen underlined, reproducing the issue with the autodesk API seems like a lot of work and I can't spend that much time on this right now (not only we need to follow the tutorial, but also set up a complete environment to have a project resource available). I'm not sure if there would be a way to reproduce this particular problem in a much simpler fashion. What if wire logging is enabled while getting the response - would that help?

Without a reduced, contained repro, I don't think we can make progress on this.

violetagg commented 3 years ago

What if wire logging is enabled while getting the response - would that help?

This will show us what content is coming and not where the memory is.

@walfen @bclozel OK. I'm closing this one.