zendesk / ruby-kafka

A Ruby client library for Apache Kafka
http://www.rubydoc.info/gems/ruby-kafka
Apache License 2.0
1.27k stars 340 forks source link

Memory leak while consuming messages #726

Closed jimymodi closed 5 years ago

jimymodi commented 5 years ago
Steps to reproduce
require_relative './../config/initializers/app_initializer'

kafka = Kafka.new(
      Config.kafka['brokers']
)

consumer = kafka.consumer(
      offset_commit_threshold: 100,
        group_id: Config.kafka['group_id']
)

# It's possible to subscribe to multiple topics by calling `subscribe`
# # repeatedly.
consumer.subscribe(Config.kafka['topic'])

# # Stop the consumer when the SIGTERM signal is sent to the process.
# # It's better to shut down gracefully than to kill the process.
trap("TERM") { consumer.stop }

report = MemoryProfiler.report do
  inv = 0
  consumer.each_message do |message| 
    if inv < 200000
    if inv % 10000 == 0
          mb = GetProcessMem.new.mb
          puts "#{ inv } - MEMORY USAGE(MB): #{ mb.round }"
        end
      inv = inv + 1
    else
      break
    end
  end
end
report.pretty_print()
Expected outcome

Memory should not be accumulating when consuming new messages.

Actual outcome

Memory consumption goes on increasing when consuming new messages.

0 - MEMORY USAGE(MB): 222
10000 - MEMORY USAGE(MB): 481
20000 - MEMORY USAGE(MB): 656
30000 - MEMORY USAGE(MB): 873
40000 - MEMORY USAGE(MB): 1102
50000 - MEMORY USAGE(MB): 1208
60000 - MEMORY USAGE(MB): 1457
70000 - MEMORY USAGE(MB): 1579
80000 - MEMORY USAGE(MB): 1867
90000 - MEMORY USAGE(MB): 1979
100000 - MEMORY USAGE(MB): 2226
110000 - MEMORY USAGE(MB): 2472
120000 - MEMORY USAGE(MB): 2578
130000 - MEMORY USAGE(MB): 2613
140000 - MEMORY USAGE(MB): 2659
mensfeld commented 5 years ago

Hmmm. THat is interesting. I didn't observe that for our production systems. Debugging...

mensfeld commented 5 years ago

Hey @jimymodi I gave it a shot with a setup like that and I don't see the problem:

require 'kafka'
require 'memory_profiler'
require 'get_process_mem'

kafka = Kafka.new(
    'kafka://127.0.0.1'
)

consumer = kafka.consumer(
    offset_commit_threshold: 100,
    group_id: 'dadadada',
    fetcher_max_queue_size: 2
)

# It's possible to subscribe to multiple topics by calling `subscribe`
# # repeatedly.
consumer.subscribe('test')

inv = 0
consumer.each_message do |message|
  if inv < 200000
    if inv % 10000 == 0
        mb = GetProcessMem.new.mb
        puts "#{ inv } - MEMORY USAGE(MB): #{ mb.round }"
      end
    inv = inv + 1
  else
    break
  end
end
0 - MEMORY USAGE(MB): 34
10000 - MEMORY USAGE(MB): 41
20000 - MEMORY USAGE(MB): 40
30000 - MEMORY USAGE(MB): 40
40000 - MEMORY USAGE(MB): 40
50000 - MEMORY USAGE(MB): 40
60000 - MEMORY USAGE(MB): 40
70000 - MEMORY USAGE(MB): 40
80000 - MEMORY USAGE(MB): 42
90000 - MEMORY USAGE(MB): 42
100000 - MEMORY USAGE(MB): 42
110000 - MEMORY USAGE(MB): 42
120000 - MEMORY USAGE(MB): 42
130000 - MEMORY USAGE(MB): 42
140000 - MEMORY USAGE(MB): 42

it seems that the profiler is somehow aggregating a lot of data causing memory to bloat. Also keep in mind that due to prefetching, ruby-kafka will load a significant amount of data upfront if you don't setup the fetcher_max_queue_size setting based on your setup.

I feel that this issue can be safely closed as it is invalid.

jimymodi commented 5 years ago

Ah! Setting lower fetcher_max_queue_size solved the problem. Thanks @mensfeld.

Shouldn't we change the default 100 to something lower ?

mensfeld commented 5 years ago

@jimymodi it is changed in karafka framework: https://github.com/karafka/karafka/blob/master/lib/karafka/setup/config.rb#L86

@dasch seems to have a lot of memory available ;)