spring-cloud / spring-cloud-gateway

An API Gateway built on Spring Framework and Spring Boot providing routing and more.
http://cloud.spring.io
Apache License 2.0
4.51k stars 3.31k forks source link

Memory LEAK, response logging (no modification) #3140

Closed jacekgajek closed 6 months ago

jacekgajek commented 10 months ago

We want to migrate from Zuul to Spring Gateway, but we experience memory problems on our low-traffic QA environment which blocks us from deployment to Production. It's worth to note that all features worked perfectly with Zuul, so it's unlikely that it's a hardware problem.

image

On the above graph of memory utilization, there are three cases

3

Reverted to the old Zuul implementation, which stores responses to S3 without an issue.

1

The following filter was added (fulll code below):

@Override
public @NonNull Mono<Void> writeWith(@NonNull Publisher<? extends DataBuffer> body) {
    log.debug("Joining Buffers...");
    return super.writeWith(DataBufferUtils.join(body)
            .map(this::cacheAndApply)
            .doOnDiscard(DataBuffer.class, DataBufferUtils::release));
}

@NonNull
private DataBuffer cacheAndApply(DataBuffer originalDataBuffer) {
    val dataBuffer = originalDataBuffer.retainedSlice(0, originalDataBuffer.readableByteCount());
    log.debug("Releasing buffer...");
    DataBufferUtils.release(dataBuffer);
    return originalDataBuffer;
}

Note that it doesn't really do anything, it just joins buffer, creates a slice and releases it.

2

This scenario happened when we actually wanted to do something with this response - store it in an S3 bucket on AWS (it's a service for storing files on a cloud). It was dying with OutOfMemory and was constantly restarted by AWS.

Reproduction

I was able to reproduce it locally – I gave JVM enough memory to hold three ~30 MB responses and ran three requests at the same time. It was OK. Then after sending even a single request at a time I got an OutOfMemory exception

image

2023-11-17 10:59:34.792 ERROR 134268 --- [or-http-epoll-8] o.s.w.s.adapter.HttpWebHandlerAdapter    : [522241a1-7] Error [java.lang.OutOfMemoryError: Direct buffer memory] for HTTP POST "/api/oicp/evsepull/v22/providers/DE*ICE/data-records", but ServerHttpResponse already committed (200 OK)
2023-11-17 10:59:34.793 ERROR 134268 --- [or-http-epoll-8] r.n.http.server.HttpServerOperations     : [522241a1-1, L:/127.0.0.1:8003 - R:/127.0.0.1:34024] Error finishing response. Closing connection

java.lang.OutOfMemoryError: Direct buffer memory
    at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    *__checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ HTTP POST "/api/..." [ExceptionHandlingWebHandler]
Original Stack Trace:
        at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
        at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
        at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
        at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:649)
        at io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:638)
        at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:214)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:141)
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
        at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:396)
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
        at io.netty.channel.epoll.AbstractEpollChannel.newDirectBuffer0(AbstractEpollChannel.java:329)
        at io.netty.channel.epoll.AbstractEpollChannel.newDirectBuffer(AbstractEpollChannel.java:315)
        at io.netty.channel.epoll.AbstractEpollChannel.newDirectBuffer(AbstractEpollChannel.java:298)
        at io.netty.channel.epoll.AbstractEpollStreamChannel.filterOutboundMessage(AbstractEpollStreamChannel.java:521)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:868)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:697)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.write(CombinedChannelDuplexHandler.java:521)
        at io.netty.handler.codec.MessageToMessageEncoder.writePromiseCombiner(MessageToMessageEncoder.java:140)
        at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:120)
        at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:346)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
        at reactor.netty.http.server.HttpTrafficHandler.write(HttpTrafficHandler.java:314)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
        at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
        at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:391)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)

Full code for case (1)

package foobar;

import java.io.InputStream;

import org.reactivestreams.Publisher;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.NettyWriteResponseFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
 * Modification of {@link org.springframework.cloud.gateway.filter.factory.rewrite.ModifyResponseBodyGatewayFilterFactory}
 * which supports conditional processing of response and doesn't allow modifying it.
 */
@Component
public class ProcessResponseBodyGatewayFilterFactory
        extends AbstractGatewayFilterFactory<ProcessResponseBodyGatewayFilterFactory.Config> {

    public ProcessResponseBodyGatewayFilterFactory() {
        super(Config.class);
    }

    public GatewayFilter apply(ConditionalRewriteFunction fun) {
        return new ModifyResponseGatewayFilter(new Config(fun));
    }

    @Override
    public GatewayFilter apply(Config config) {
        return new ModifyResponseGatewayFilter(config);
    }

    public interface ConditionalRewriteFunction {

        boolean shouldExecute(ServerWebExchange exchange);

        void apply(ServerWebExchange exchange, InputStream inputStream, long length, Runnable onComplete);
    }

    @Getter
    @AllArgsConstructor
    @Builder
    public static class Config {
        private ConditionalRewriteFunction consume;
    }

    public static class ModifyResponseGatewayFilter implements GatewayFilter, Ordered {

        private final Config config;

        public ModifyResponseGatewayFilter(Config config) {
            this.config = config;
        }

        @Override
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            return chain.filter(exchange.mutate().response(new ModifiedServerHttpResponse(exchange, config)).build());
        }

        @Override
        public int getOrder() {
            return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;
        }
    }

    @Slf4j
    protected static class ModifiedServerHttpResponse extends ServerHttpResponseDecorator {

        private final ServerWebExchange exchange;

        private final Config config;

        public ModifiedServerHttpResponse(ServerWebExchange exchange, Config config) {
            super(exchange.getResponse());
            this.exchange = exchange;
            this.config = config;
        }

        @Override
        public @NonNull Mono<Void> writeWith(@NonNull Publisher<? extends DataBuffer> body) {
            log.debug("Joining Buffers...");
            return super.writeWith(DataBufferUtils.join(body)
                    .map(this::cacheAndApply)
                    .doOnDiscard(DataBuffer.class, DataBufferUtils::release));
        }

        @NonNull
        private DataBuffer cacheAndApply(DataBuffer originalDataBuffer) {
            val dataBuffer = originalDataBuffer.retainedSlice(0, originalDataBuffer.readableByteCount());
            log.debug("Releasing buffer...");
            DataBufferUtils.release(dataBuffer);
            return originalDataBuffer;
        }

        @Override
        public @NonNull Mono<Void> writeAndFlushWith(@NonNull Publisher<? extends Publisher<? extends DataBuffer>> body) {
            return writeWith(Flux.from(body).flatMapSequential(p -> p));
        }
    }
}

My goal is to save the response to S3. In Zuul it was achieved by caching each response to a byte array and sending it with AWS S3 API. This had an almost constant memory usage.

Also what I really don't like is that DataBufferUtils.join(body) seems to allocate memory on heap. I would prefer to share direct memory between two streams - one to send a response and another to store response in S3. They should share memory and when both are closed then memory is released.

1120562641 commented 7 months ago

Has this problem been solved?

1120562641 commented 7 months ago

I use ModifyResponseBodyGatewayFilterFactory to log responses,netty report Memory LEAK

spencergibb commented 6 months ago

DataBufferUtils is a framework class, not specific to gateway. It doesn't look like there's anything specific to gateway in your custom code.

spring-cloud-issues commented 6 months ago

If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.

spring-cloud-issues commented 6 months ago

Closing due to lack of requested feedback. If you would like us to look at this issue, please provide the requested information and we will re-open the issue.