phutchins / logstash-input-mongodb

MongoDB input plugin for Logstash
Other
187 stars 104 forks source link

Exit loop once all documents are read #19

Open snagajot opened 8 years ago

snagajot commented 8 years ago

@phutchins Currently, the plugin processes all the documents in the collection and infinitely waits/listens for new documents. I am trying to make the process exit once all the documents are read, so I tried this fix within 'get_cursor_for_collection' method -

        public
          def get_cursor_for_collection(mongodb, mongo_collection_name, last_id_object, batch_size)
            collection = mongodb.collection(mongo_collection_name)
            # Need to make this sort by date in object id then get the first of the series
            # db.events_20150320.find().limit(1).sort({ts:1})
            cursors = collection.find({:_id => {:$gt => last_id_object}}).limit(batch_size).to_a #store the array
            if cursors.empty? then exit end #If cursor array empty, then exit
            return collection.find({:_id => {:$gt => last_id_object}}).limit(batch_size)
        end

Although i was able to test this out, when I run it against my actual collection it doesn't work. It doesn't error out but simply exits after a few seconds.

Is there a better way to do this ?

Thanks!

phutchins commented 8 years ago

Hey @snagajot That is true, and I can definitely see the case where you would want this behavior. When I get a chance, I'll take a look at what you have here and see what the best way to do this would be. My initial thought is to simply add a configuration parameter that only lets the process complete once.

We should be able to add something here...

https://github.com/phutchins/logstash-input-mongodb/blob/master/lib/logstash/inputs/mongodb.rb#L346

... that exits the logstash process if the config parameter is set to only do one pass.

snagajot commented 8 years ago

@phutchins I agree with the config parameter. I can only see one issue with adding the exit at line 346, which is when the batch size is less than the total collection document count. In this case, the loop is necessary until all documents are read. Which is why i tried adding the exit to the get_cursor_for_collection method. Makes sense ?

phutchins commented 8 years ago

@snagajot yes, completely. We can do something like set a flag for each collection when it returns no more documents and if all of the tracked collections are flagged as empty, then exit.

snagajot commented 8 years ago

@phutchins @jatanpatel92 I went ahead and used the exit code in get_cursor_for_collection method like this -

            def get_cursor_for_collection(mongodb, mongo_collection_name, last_id_object, batch_size)
                collection = mongodb.collection(mongo_collection_name)
                # Need to make this sort by date in object id then get the first of the series
                # db.events_20150320.find().limit(1).sort({ts:1})
                cursor_size = collection.find({:_id => {:$gt => last_id_object}}).limit(batch_size).to_a.size
                @logger.info("size of array: #{cursor_size}")
                if cursor_size == 0 then exit end
                return collection.find({:_id => {:$gt => last_id_object}}).limit(batch_size)
            end

and below is my run method

              def run(queue)
                sleep_min = 0.2
                sleep_max = 5
                sleeptime = sleep_min

            begin
                  @logger.debug("Tailing MongoDB")
                  @logger.debug("Collection data is: #{@collection_data}")
                  loop do
                    @collection_data.each do |index, collection|
                      collection_name = collection[:name]
                      @logger.debug("collection_data is: #{@collection_data}")
                      last_id = @collection_data[index][:last_id]
                      #@logger.debug("last_id is #{last_id}", :index => index, :collection => collection_name)
                      # get batch of events starting at the last_place if it is set
                      last_id_object = BSON::ObjectId(last_id)
                      cursor = get_cursor_for_collection(@mongodb, collection_name, last_id_object, batch_size)
                      #@logger.info(cursor.to_a.to_s)
                      cursor.each do |doc|
                        logdate = DateTime.parse(doc['_id'].generation_time.to_s)
                        #event = LogStash::Event.new("message" => doc.to_h.to_json.to_s)
                        @codec.decode(doc.to_h.to_json.to_s) do |event|
                        event["logdate"] = logdate.iso8601
                        decorate(event)
                        queue << event
                        end
                        @collection_data[index][:last_id] = doc['_id'].to_s
                      end
                      # Store the last-seen doc in the database
                      update_placeholder(@sqlitedb, since_table, collection_name, @collection_data[index][:last_id])
                    end
                    @logger.debug("Updating watch collections")
                    @collection_data = update_watched_collections(@mongodb, @collection, @sqlitedb)

                    # nothing found in that iteration
                    # sleep a bit
                    #@logger.debug("No new rows. Sleeping.", :time => sleeptime)
                    sleeptime = [sleeptime * 2, sleep_max].min
                    sleep(sleeptime)
                    sleeptime = sleep_min
                  end
                rescue LogStash::ShutdownSignal
                  if @interrupted
                    @logger.debug("Mongo Input shutting down")
                  end
                end
              end # def run

But this lead to a completely bizarre issue - Running out of Java Heap space. Following is my environment -

  1. Mongo DB - 5 mil docs
  2. Logstash mongo input 'Batch size' - 15000

If I remove the exit code (cursor size check) everything runs fine but the loop is infinite. Am I doing something wrong that causes this memory leak ? Is there a different method to exit the program ? I am totally new to ruby so I would appreciate another set of eyes (expert in this case) to figure this one out. Thanks in advance.

jatanpatel92 commented 8 years ago

@snagajot: This plugin constantly checks for real time data from MongoDB. Here is a temporary solution, If you can try to make it a service then OS can manage the memory issues. I am also learning Ruby in order to find the solution. We need eyes to find gems and rubies as of now!

snagajot commented 8 years ago

@jatanpatel92 Can you elaborate on how you run it as a service ?

snagajot commented 8 years ago

@jatanpatel92 i did a little reading on running logstash as a service and it seemed a bit of an overkill for my purpose. I think the simpler solution for my problem would be to just break the loop when all documents are read in a collection. If @phutchins could find a way to implement the exit logic without causing a memory leak (I still don't understand why or how), we will be golden.

snagajot commented 8 years ago

@jatanpatel92 @phutchins so i came up with an alternative way to exit the loop gracefully, dare say hack. In a nutshell, I added a new method to get the last id of the entire collection and store it in a variable. Which is then checked against last id of each collection batch to see if its the last document and exit if yes. Below is some code snippets -

  1. New method to get the last document id in a collection

             public
             def get_last_collection_id(mongodb, collection)
             mongo_collection = mongodb.collection(collection)
             last_entry = mongo_collection.find({}).sort('_id' => -1).limit(1).first
             last_entry_id = last_entry['_id'].to_s
             return last_entry_id
             end
  2. Store last document id in a variable @end_id in method "register"

           @end_id = get_last_collection_id(@mongodb, @collection)
  3. "Run" method where it checks and exits loop

           def run(queue)
               sleep_min = 0.2
               sleep_max = 5
               sleeptime = sleep_min
    
               begin
                 @logger.debug("Tailing MongoDB")
                 @logger.debug("Collection data is: #{@collection_data}")
                 loop do
                   @collection_data.each do |index, collection|
                     collection_name = collection[:name]
                     @logger.debug("collection_data is: #{@collection_data}")
                     last_id = @collection_data[index][:last_id]
                     last_id_object = BSON::ObjectId(last_id)
                     cursor = get_cursor_for_collection(@mongodb, collection_name, last_id_object, batch_size)
                     cursor.each do |doc|
                   @logger.info("Document is #{doc['_id'].to_s}")
                       logdate = DateTime.parse(doc['_id'].generation_time.to_s)
                       @codec.decode(doc.to_h.to_json.to_s) do |event|
                       event["logdate"] = logdate.iso8601
                       decorate(event)
                       queue << event
                       end
                       @collection_data[index][:last_id] = doc['_id'].to_s
                     end
                     # Store the last-seen doc in the database
                     update_placeholder(@sqlitedb, since_table, collection_name, @collection_data[index][:last_id])
                   end
                   @logger.debug("Updating watch collections")
                   @collection_data = update_watched_collections(@mongodb, @collection, @sqlitedb)
    
               #Check if end of collection and break the loop
               if @collection_data[@collection][:last_id] == @end_id 
                   then @logger.info("this is the last document in the collection")
                        sleep(15)
                        break
               end
    
                   # nothing found in that iteration
                   # sleep a bit
                   #@logger.debug("No new rows. Sleeping.", :time => sleeptime)
                   sleeptime = [sleeptime * 2, sleep_max].min
                   sleep(sleeptime)
                   sleeptime = sleep_min
                 end
               rescue LogStash::ShutdownSignal
                 if @interrupted
                   @logger.debug("Mongo Input shutting down")
                 end
               end
             end # def run

The performance has been good so far, no memory leaks. Let me know if you see anything wrong with this.

@phutchins this goes back to your initial suggestion of adding a config parameter which is doable too

phutchins commented 8 years ago

@snagajot This is awesome! If you're open to creating a pull request I'd love to pull this into the plugin. We would have to add a config parameter to be able to switch this functionality on, keeping the exiting functionality unless the parameter is added.

I'd call it 'one_pass' or something along those lines...

Thanks!

hu985815621 commented 8 years ago

@phutchins Is this plugin used to import mongodb data into elasticsearch?

jatanpatel92 commented 8 years ago

Yes

marcinkubica commented 8 years ago

Um, wonder what happened to this? Why do I need it and it's not there ehh :)

phutchins commented 8 years ago

@marcinkubica Feel free to submit a PR with the needed changes :).

marcinkubica commented 8 years ago

I would if I'd be a ruby dev. But im not :(

Sent from Gmail Mobile

phutchins commented 8 years ago

@marcinkubica this is on my list... I'll get to it as soon as I can but there are a number of things in front of it. Shouldn't be too difficult as the code is pretty much already done by @snagajot above. Maybe he will submit a PR.

snagajot commented 8 years ago

@phutchins I will give it a shot. My real challenge will be adding the config param and realigning the code to adhere to one single functionality. I am a newbie so pardon me if I mess something up.

phutchins commented 8 years ago

@snagajot awesome, thanks! No worries on being a newbie, gotta learn somehow. I'm glad to work with you until we get it right...

marcinkubica commented 8 years ago

@phutchins @snagajot cheers guys. I'll manage with custom stuff for the time being (mongoexport piped to logstash stdin) but count me in as tester once you got anything out there. Just pop a line ;)