spring-cloud / spring-cloud-gateway

An API Gateway built on Spring Framework and Spring Boot providing routing and more.
http://cloud.spring.io
Apache License 2.0
4.51k stars 3.31k forks source link

CacheRequestBodyGatewayFilterFactory cause memory leak #2672

Closed explorer0113 closed 1 year ago

explorer0113 commented 2 years ago

usage

application.yml :

server:
  port: 8080

spring:
  application:
    name: body-leak
  cloud:
    gateway:
      routes:
        - id: cache_request_body_route
          uri: no://route
          predicates:
            - Path=/cache
          filters:
            - name: CacheRequestBody
              args:
                bodyClass: java.lang.String

break point in RemoveCachedBodyFilter...

스크린샷 2022-07-13 오후 8 48 45

but cached attribute instanceof PooledDataBuffer is false

so it is not released

explorer0113 commented 2 years ago

i think there is no code to release body's buffer in CacheRequestBodyGatewayFilterFactory

        return ServerWebExchangeUtils.cacheRequestBodyAndRequest(exchange, (serverHttpRequest) -> {
            final ServerRequest serverRequest = ServerRequest
                    .create(exchange.mutate().request(serverHttpRequest).build(), messageReaders);
            return serverRequest.bodyToMono((config.getBodyClass())).doOnNext(objectValue -> {
                exchange.getAttributes().put(ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR, objectValue);
            }).then(Mono.defer(() -> {
                ServerHttpRequest cachedRequest = exchange
                        .getAttribute(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);
                Assert.notNull(cachedRequest, "cache request shouldn't be null");
                exchange.getAttributes().remove(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);
                return chain.filter(exchange.mutate().request(cachedRequest).build());
            }));
        });

and it occurs memory leak when load test

i tried load test with locust

from locust import HttpUser, task

bigBody = "big" * 30000

class HelloWorldUser(HttpUser):
    @task
    def hello_world(self):
        self.client.get(
            url ="/cache",
            json={"body":bigBody}
        )

leak message:

2022-07-13 21:23:38.566 ERROR 20682 --- [ctor-http-nio-9] io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:403)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
    io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:140)
    io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:120)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:150)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Thread.java:833)
tuomoa commented 2 years ago

There is discussion about this in https://github.com/spring-cloud/spring-cloud-gateway/issues/1587. You should probably also use RemoveCachedBodyFilter in your use case.

explorer0113 commented 2 years ago

The filter doesn't work well.

public class RemoveCachedBodyFilter implements GlobalFilter, Ordered {

    private static final Log log = LogFactory.getLog(RemoveCachedBodyFilter.class);

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        return chain.filter(exchange).doFinally(s -> {
            Object attribute = exchange.getAttributes().remove(CACHED_REQUEST_BODY_ATTR);
            if (attribute != null && attribute instanceof PooledDataBuffer) {
                PooledDataBuffer dataBuffer = (PooledDataBuffer) attribute;
                if (dataBuffer.isAllocated()) {
                    if (log.isTraceEnabled()) {
                        log.trace("releasing cached body in exchange attribute");
                    }
                    dataBuffer.release();
                }
            }
        });
    }

    @Override
    public int getOrder() {
        return HIGHEST_PRECEDENCE;
    }

}

it checks the type but CACHED_REQUEST_BODY_ATTR could not be PooledDataBuffer type

And the root problem is GC occurs before BufferByte release

first in ServerWebExchangeUtils.cacheRequestBodyAndRequest,

        if (dataBuffer.readableByteCount() > 0) {
            if (log.isTraceEnabled()) {
                log.trace("retaining body in exchange attribute");
            }
            exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, dataBuffer);
        }

and, over write CACHED_REQUEST_BODY_ATTR

return serverRequest.bodyToMono((config.getBodyClass())).doOnNext(objectValue -> {
                exchange.getAttributes().put(ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR, objectValue);
            })

old object in CACHED_REQUEST_BODY_ATTR has gone without release i think this is root problem @tuomoa

tianshuang commented 2 years ago

@WellBeing-Man How to reproduce this problem? Can you give the complete Python test code? I can't reproduce the problem, if the bodyClass is String, then the dataBuffer will be released in org.springframework.core.codec.StringDecoder#decode:

@Override
public String decode(DataBuffer dataBuffer, ResolvableType elementType,
    @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

    Charset charset = getCharset(mimeType);
    CharBuffer charBuffer = charset.decode(dataBuffer.asByteBuffer());
    DataBufferUtils.release(dataBuffer);
    String value = charBuffer.toString();
    LogFormatUtils.traceDebug(logger, traceOn -> {
        String formatted = LogFormatUtils.formatValue(value, !traceOn);
        return Hints.getLogPrefix(hints) + "Decoded " + formatted;
    });
    return value;
}

Reference: spring-framework/StringDecoder.java at main · spring-projects/spring-framework · GitHub

tianshuang commented 2 years ago

In your screenshot, the attribute is of type String, no need to release it.

explorer0113 commented 2 years ago

I misunderstand at first... sorry as you say, the attribute is of type String, no need to release it.

But i think the problem is here https://github.com/spring-cloud/spring-cloud-gateway/issues/2672#issuecomment-1185307421

explorer0113 commented 2 years ago

https://github.com/WellBeing-Man/gateway-cache-leak here is reproducing code @tianshuang

Layfolk-zcy commented 2 years ago

The filter doesn't work well.

public class RemoveCachedBodyFilter implements GlobalFilter, Ordered {

  private static final Log log = LogFactory.getLog(RemoveCachedBodyFilter.class);

  @Override
  public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
      return chain.filter(exchange).doFinally(s -> {
          Object attribute = exchange.getAttributes().remove(CACHED_REQUEST_BODY_ATTR);
          if (attribute != null && attribute instanceof PooledDataBuffer) {
              PooledDataBuffer dataBuffer = (PooledDataBuffer) attribute;
              if (dataBuffer.isAllocated()) {
                  if (log.isTraceEnabled()) {
                      log.trace("releasing cached body in exchange attribute");
                  }
                  dataBuffer.release();
              }
          }
      });
  }

  @Override
  public int getOrder() {
      return HIGHEST_PRECEDENCE;
  }

}

it checks the type but CACHED_REQUEST_BODY_ATTR could not be PooledDataBuffer type

And the root problem is GC occurs before BufferByte release

first in ServerWebExchangeUtils.cacheRequestBodyAndRequest,

      if (dataBuffer.readableByteCount() > 0) {
          if (log.isTraceEnabled()) {
              log.trace("retaining body in exchange attribute");
          }
          exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, dataBuffer);
      }

and, over write CACHED_REQUEST_BODY_ATTR

return serverRequest.bodyToMono((config.getBodyClass())).doOnNext(objectValue -> {
              exchange.getAttributes().put(ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR, objectValue);
          })

old object in CACHED_REQUEST_BODY_ATTR has gone without release i think this is root problem

GlobalFilter doesn't work well. How implement a filter to intercept all request. Thank you!

explorer0113 commented 2 years ago

sorry I have changed the code @Layfolk-zcy

wuxudong commented 2 years ago

It is a bug, check pr #2483 . Unfortunately, it is still not merged after 6 months.

pengqun commented 1 year ago

I had trouble with this memory leaking problem too, using the latest spring cloud gateway version: 2022.0.0

For anyone who is looking for solutions, here is my quick workaround to replace 'CacheRequestBody' filter without causing memory leaks (verified in my project environment):

  1. When building routes, send a EnableBodyCachingEvent for each route needing access to cached request body (which will be received by framework's AdaptCachedBodyGlobalFilter and do the caching stuffs in runtime):

    val routeId = "xxx"
    builder.routes().route(routeId) { ... }
    applicationContext.publishEvent(EnableBodyCachingEvent(this, routeId))
  2. In your custom filters, accessing the cached request body like this:

    val cachedBodyBuffer = exchange.attributes[ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR] as DataBuffer?
    val cachedBody = cachedBodyBuffer?.let { StandardCharsets.UTF_8.decode(it.toByteBuffer()).toString() }

Now you got the full cached request body in String (or you can parse it into other types according to its content-type).

spencergibb commented 1 year ago

Closing in favor of #2842