akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka/current/
Other
1.26k stars 644 forks source link

JmsProducer memory leak #1837

Closed konaleksey123 closed 5 years ago

konaleksey123 commented 5 years ago

Hello.

I am using akka-stream-alpakka-jms-1.1.0 and have intensive sending message to some queue. A simplified version of my application is similar to:

  Source
    .repeat("message")
    .mapAsyncUnordered(10) { msg =>
      for {
        _ <- Source
          .single(msg)
          .runWith(
            JmsProducer.textSink(
              JmsProducerSettings(jmsProducerSettings, ibmConnFactory)
                .withQueue("queue1")
            )
          )
        // Do some other business logic. For example, save message send time in database
      } yield ()
    }
    .runWith(Sink.ignore)

All jms connections and sessions are closed, but there is the memory leak. I noticed that some ibm objects are in actor's buffer and don't gc.

Please help me to resolve the issue.

memory leak

2m commented 5 years ago

Hi,

in the code example you have provided you materialize the stream ten times for every incoming message. That should not result in a memory leak, of course, but nevertheless it is sub-optimal way of using Akka Stream API.

If you want to have the message sent 10 times, duplicate that message in a stream, attach a single Sink, and materialize the stream only once.

As to the memory leak, my hunch is that those resources would be eventually released, but as the stream is being started constantly, some amount of resources is always held up.

konaleksey123 commented 5 years ago

@2m I dont want to duplicate message, its simple example for show a leak in JmsProducer.

Actually in my app I have code:

    JmsConsumer
      .ackSource(
        JmsConsumerSettings(jmsConsumerSettings, ibmConnFactory)
          .withQueue("topic")
      )
      .mapAsyncUnordered(10) { envelope =>
        for {
          message <- Future(envelope.message.getBody(classOf[String]))
          _ <- Source.single(message).runWith(JmsProducer.textSink(
              JmsProducerSettings(jmsProducerSettings, ibmConnFactory)
                .withQueue("queue1")
            ))
          // Do some other business logic. For example, save message send time in database
        } yield envelope.acknowledge()
      }
      .runWith(Sink.ignore)

and config:

alpakka.jms {
  consumer {
    session-count = 10
    buffer-size = 0
  }
}

I need a given throughput, so I want to consume message parallel in 10 ssesions, send it to other queue, save something in databaase and then ack message.

Maybe something is wrong in my code?

andreas-schroeder commented 5 years ago

Hi @konaleksey123, what's happening in your code is that for every message, a fresh textSink gets created. There is no reuse of Jms sessions or connections behind the scenes, so every time a textSink gets created, all the Jms machinery is kicked of and resources are allocated. So in your case, the amount of parallel sessions is controlled by .mapAsyncUnordered(10). To avoid these JmsProducer allocations, try to use something along the lines of

JmsConsumer
  .ackSource(
      JmsConsumerSettings(jmsConsumerSettings, ibmConnFactory)
        .withQueue("topic"))
  .map(m => JmsTextMessage(m.message.getBody(classOf[String])).withPassThrough(m))
  .via(JmsProducer.flexiFlow(
      JmsProducerSettings(jmsProducerSettings, ibmConnFactory)
        .withQueue("queue1")))
  .mapAsyncUnordered(10) { m =>
    Future {
      // Do some other business logic. For example, save message send time in database
      m.passThrough.acknowledge()
    }
  }
  .runWith(Sink.ignore)
konaleksey123 commented 5 years ago

Hi @andreas-schroeder, thx for explanation. I saw JmsProducer.flexiFlow in docs and I had written this solution as workaround because there is reuse session/connection. Before that I used similar code with amqp endpoints and there isnt same problem. Therefore there was a question.

WellingR commented 4 years ago

@2m @andreas-schroeder This issue was closed by the reporter because he found a workaround. I agree that this usage pattern is with flexiFlow producer better, however I really do observe a memory leak.

I have used code similar to what has been reported in the issue, however I limit the number of messages to 6000 messages. After which the steam terminates but it it does NOT free up the memory. The test application uses about 150MB memory in this case, while it is idle.

A heap analysis shows that exactly 6000 ActiveMQConnections are in memory, including 6000 instances of akka.stream.alpakka.jms.impl.JmsConnector$$anon$1 and 6000 akka.stream.alpakka.jms.impl.JmsProducerStage$$anon$2 instances.

It is not yet clear to me why this is happening, but is appears that something is not being closed or freed properly

ennru commented 4 years ago

Did you notice https://github.com/akka/alpakka/pull/2048 which was released as part of Alpakka JMS 2.0.0-M2?

WellingR commented 4 years ago

Did not notice this fix, however I also ran my tests on master (commit 1805f78) which includes the fixes of 2.0.0-M2

Looking at the data in my heap dump, it does seem like all sessions and connection are properly closed. However for some reason there is still some reference to them which prevents them from being garbage collected.

andreas-schroeder commented 4 years ago

Hi @WellingR, I've been reading along and finally found some time to dig into it as well; After giving the debugger a shot, I found that the objects you mentioned as having 6000 each are the graph stage logic ( akka.stream.alpakka.jms.impl.JmsProducerStage$$anon$2 ) and I think akka.stream.alpakka.jms.impl.JmsConnector$$anon$1 is the JMS exception listener, though I can't confirm that 100% right now.

Given that the number of graph stage logic objects corresponds to the number of produced messages, can you please double-check that

1) your code doesn't create a stream per message, and 2) that there are no references kept to the stream?

If you are sure, I will proceed with further investigation.

WellingR commented 4 years ago

I used the following code to reproduce the issue https://gist.github.com/WellingR/35023510c042622b5186723ad16f73aa

  1. I do create a stream per message being published. While this is definitely suboptimal and I see a huge performance difference by using flexiflow, however I still think that these objects should be garbage collected.
  2. Unless I am missing something, the code does not keep any reference to the stream.
andreas-schroeder commented 4 years ago

Hi @WellingR, thanks for providing the code. I was indeed able to reproduce your issue while running ActiveMQ in the same JVM provided by JmsSpec.withServer, as the in-memory messages in the broker keep a reference to the connection that holds a reference to the exception listener, and this references the whole stage (which it needs to).

When I run a JMS consumer in parallel that consumes from the queue, these messages are freed and so are the graph stage logic objects (akka.stream.alpakka.jms.impl.JmsProducerStage$$anon$2).

val consumerSettings = JmsConsumerSettings(system, connectionFactory).withQueue("queue1")
val (control, done) = JmsConsumer.textSource(consumerSettings).toMat(Sink.ignore)(Keep.both).run()

Can you try and see if you can observe the same effect?

Still, there is a smaller amount of memory that leaks and that needs to be fixed. It is due to the pre-materialization for connection status done here, the queue doesn't get GC-ed if the connection status isn't consumed (and it actually also becomes reclaimable if the connection status source gets consumed). So thank you for reporting this issue and keeping at it!

WellingR commented 4 years ago

I am runnng activemq in a docker. So starting a consumer based on the code above made no noticable difference in teams of memory usage.

I also tried to to consume the connectorState source, this does make a huge difference. It seems that the memory is freed when this source is consumed.

So all that needs to be fixed is that we make sure that the pre-materialized connectionStateQueue is somehow drained, even if it was never consumed.

andreas-schroeder commented 4 years ago

I see, interesting. While I saw the potential for a leak there, I didn't observe it. I already submitted a fix for the leak yesterday here: https://github.com/akka/alpakka/pull/2067