spring-projects / spring-integration

Spring Integration provides an extension of the Spring programming model to support the well-known Enterprise Integration Patterns (EIP)
http://projects.spring.io/spring-integration/
Apache License 2.0
1.54k stars 1.11k forks source link

Memory leak with Spring Cloud Stream - metrics #2733

Closed sabbyanandan closed 5 years ago

sabbyanandan commented 5 years ago

From @mafa73 on February 4, 2019 19:10

I've upgraded to Spring Boot 2.1.2 and Spring Cloud Stream - Cloud dependencies Greenwich.RELEASE. After upgrading my application run for a few hours in production until it got an OutOfMemoryException. Didn't change any code, just the dependencies. When reverting to Spring Boot 2.0.8 and Spring Cloud Stream - Finchley.SR2 the memory leak has not occurred.

The application consumes from 8 Kafka topics and produces to one topic using the Kafka binder.

Dependency management:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-dependencies</artifactId>
    <version>2.1.2.RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dependencies</artifactId>
    <version>Greenwich.RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

Runtime is Java 11 - OpenJDK

Part of application.yml

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: kafka-server:port
      bindings:
        created-material-in:
          destination: created-material-event
        created-program-in:
          destination: created-program-event
        updated-material-in:
          destination: updated-material-event
        updated-program-in:
          destination: updated-program-event
        deleted-material-in:
          destination: deleted-material-event
        deleted-program-in:
          destination: deleted-program-event
        process-schedule-in:
          destination: schedule-event
        remotetask-in:
          destination: task-in
        remotetask-out:
          destination: task-out
management:
  metrics:
    export:
      influx:
        enabled: True

Apart from the memory leak the application run without problems, no errors detected.

Made a heap dump and analysed it in YourKit. Most of the memory is populated with a Spring Integration Metric - org.springframework.integration.support.management.micrometer.MicrometerMetricsCaptor$MicroTimer. Can the error be the the MicroTimer has no equals or hashCode methods?

Attached are two files from YourKit. The first is the objects consuming most of the memory. yourkit-objects

And the second file with one of the ConcurrentHashMap$Node[] instance opened. yourkit-objectexplorer

Copied from original issue: spring-cloud/spring-cloud-stream#1599

sabbyanandan commented 5 years ago

From @olegz on February 4, 2019 19:32

Consider raising this issue with micrometer. Spring Cloud Stream doesn't interact with micrometer in any way - that is; there is no direct dependency between SCSt and micrometer, so while I do recognize that there may me an issue, I don't see how it could be addressed here.

sabbyanandan commented 5 years ago

From @mafa73 on February 4, 2019 19:39

I guess either micrometer or spring integration. The objects in memory are instances of org.springframework.integration.support.management.micrometer.MicrometerMetricsCaptor$MicroTimer. Hard to know which is at fault

sabbyanandan commented 5 years ago

From @olegz on February 4, 2019 19:49

Yeah, perhaps someone from SI team can look at it as well @garyrussell @artembilan

sabbyanandan commented 5 years ago

From @artembilan on February 4, 2019 20:27

Do you observe some exception during messages processing?

I only see the place where we are leaking, when an exception is thrown from sending message downstream:

...
    return sent;
}
catch (Exception e) {
    if (countsAreEnabled && !metricsProcessed) {
        if (sample != null) {
            sample.stop(buildSendTimer(false, e.getClass().getSimpleName()));
        }
...

This way we register a new MicroTimer whenever an exception is caught in that place...

Not sure what we can do unless really have equals() and hashCode() to distinguish one timer from another by the exception type...

Moving to Spring Integration though, since there is really nothing to do from Spring Cloud Stream level.

garyrussell commented 5 years ago

Registering a meter is idempotent.

mafa73 commented 5 years ago

No I haven't observed any exceptions. I'll take a look tomorrow, but I know there aren't many errors. Might be some downstream timeouts, but not many. Most errors are handled and logged without throwing an Exception to the channel. Hystrix Exceptions are retried by moving back the offset and the message is processed again.

mafa73 commented 5 years ago

I have observed exceptions. The exceptions are handled by a custom RetryTemplate. It tries first 3 times and then delegate to a Kafka specific error handler that either just commit the message or moves back the offset (so that it will be retried again in next poll) after a configurable time. I haven't seen any Exceptions for the case where the offset is moved, but many where the message is just committed (due to bad payload from another system). It is the same behaviour as when using the earlier version.

My guess is that an equals/hashCode on the MicroTime would solve this issue?

Many of the metrics looks like this in memory: metricwithtags

garyrussell commented 5 years ago

Yes; you are correct; while the creation of Timer is idempotent, we added code to keep track of them so they can be destroyed when a dynamically created channel is later removed. Those references to MeterFacade are kept in a set.

I believe the facades need to delegate their hashcode/equals to the underlying Meter (Timer in this case).