Takusei / elastic

Elastic search for multiple purpose
1 stars 0 forks source link

Logstash drops first mongo document #1

Open ghstahl opened 2 months ago

ghstahl commented 2 months ago

I successfully built this project and modified the mongodb.conf to point to my collection.

input {
  mongodb {
    uri => "mongodb://mongodb:27017/usage"
    collection => "usage_normal"
    placeholder_db_dir => "/usr/share/logstash/data/"
    batch_size => 5000
  }
}

filter {
  mutate {
    copy => { "_id" => "[@metadata][_id]"}
    remove_field => ["_id"]
  }
ruby {
    code => '
      log_entry = event.get("log_entry")
      if log_entry
        log_entry.gsub!("BSON::ObjectId(", "\"")
        log_entry.gsub!(")", "\"")
        log_entry.gsub!("=>", ":")
        log_entry.gsub!(" UTC", "Z")
        log_entry.gsub!("1970-01-01 00:00:02Z", "\"1970-01-01T00:00:02Z\"")
        log_entry.gsub!("1970-01-01 00:00:03Z", "\"1970-01-01T00:00:03Z\"")
        log_entry.gsub!("1970-01-01 00:00:01Z", "\"1970-01-01T00:00:01Z\"")
        parsed_log_entry = JSON.parse(log_entry)
        event.set("parsed_log_entry", parsed_log_entry)
      end
    '
  }
   ruby {
    code => '
      if event.get("[parsed_log_entry][metadata]")
        metadata_hash = {}
        event.get("[parsed_log_entry][metadata]").each { |hash|
          key = hash["key"]
          value = hash["value"]
          if metadata_hash[key]
            metadata_hash[key] += ", " + value
          else
            metadata_hash[key] = value
          end
        }
        metadata_hash.each { |key, value|
          event.set("[metadata][#{key}]", value)
        }
        event.remove("[parsed_log_entry][metadata]")
      end
      event.remove("[parsed_log_entry]")
    '
  }
}

output {
  elasticsearch {
    index => "usage_normal-%{+YYYY.MM.dd}"
    document_id => "%{[@metadata][_id]}"
    hosts => ["https://es01:9200"]
    user => "elastic"
    password => "password"
    ssl_enabled => true
    cacert => "/usr/share/logstash/certs/ca/ca.crt"
  }
}

My collection (usage_normal) with 3 entries.

{
    "_id" : ObjectId("66bcebfacd60385a7de52423"),
    "meta" : {
        "orgId" : "430f0845-9289-4b13-8760-111d1e96e03b",
        "bucketId" : "2024-08-14T17:00:00Z"
    },
    "date" : ISODate("1970-01-01T00:00:01.000+0000"),
    "count" : NumberLong(1),
    "source" : "mapped://source/one",
    "idempotencyKey" : "IDPKEY_000",
    "metadata" : [
        {
            "key" : "a",
            "value" : "a-value"
        },
        {
            "key" : "b",
            "value" : "b-value"
        }
    ]
}
{
    "_id" : ObjectId("66bced5acd60385a7de5242a"),
    "meta" : {
        "orgId" : "430f0845-9289-4b13-8760-111d1e96e03b",
        "bucketId" : "2024-08-14T17:00:00Z"
    },
    "date" : ISODate("1970-01-01T00:00:02.000+0000"),
    "count" : NumberLong(2),
    "source" : "mapped://source/one",
    "idempotencyKey" : "IDPKEY_001",
    "metadata" : [
        {
            "key" : "a",
            "value" : "a-value"
        },
        {
            "key" : "b",
            "value" : "b-value"
        }
    ]
}
{
    "_id" : ObjectId("66bced5bcd60385a7de5242b"),
    "meta" : {
        "orgId" : "430f0845-9289-4b13-8760-111d1e96e03b",
        "bucketId" : "2024-08-14T17:00:00Z"
    },
    "date" : ISODate("1970-01-01T00:00:03.000+0000"),
    "count" : NumberLong(3),
    "source" : "mapped://source/one",
    "idempotencyKey" : "IDPKEY_002",
    "metadata" : [
        {
            "key" : "a",
            "value" : "a-value"
        },
        {
            "key" : "b",
            "value" : "b-value"
        }
    ]
}

This could be a mongo plugin problem. What happens is that the first item in the collection is dropped. It doesn't matter how many are in there. 1..n.

If it's only 1, then nothing gets into elastic search.

Here you can see only 2 got in. image

The last logs in log stash, reference the document that doesn't make it.

2024-08-14 13:19:41 [INFO ] 2024-08-14 19:19:41.128 [[suumo_pipeline]-pipeline-manager] mongodb - Registering MongoDB input
2024-08-14 13:19:41 [INFO ] 2024-08-14 19:19:41.576 [[suumo_pipeline]-pipeline-manager] mongodb - init placeholder for logstash_since_usage_normal: {"_id"=>BSON::ObjectId('66bcebfacd60385a7de52423'), "meta"=>{"orgId"=>"430f0845-9289-4b13-8760-111d1e96e03b", "bucketId"=>"2024-08-14T17:00:00Z"}, "date"=>1970-01-01 00:00:01 UTC, "count"=>1, "source"=>"mapped://source/one", "idempotencyKey"=>"IDPKEY_000", "metadata"=>[{"key"=>"a", "value"=>"a-value"}, {"key"=>"b", "value"=>"b-value"}]}
2024-08-14 13:19:41 [INFO ] 2024-08-14 19:19:41.577 [[suumo_pipeline]-pipeline-manager] javapipeline - Pipeline started {"pipeline.id"=>"suumo_pipeline"}
2024-08-14 13:19:41 [INFO ] 2024-08-14 19:19:41.589 [Agent thread] agent - Pipelines running {:count=>1, :running_pipelines=>[:suumo_pipeline], :non_running_pipelines=>[]}

It looks like to me that it uses the first one to establish the index and then throws it away.

Takusei commented 2 months ago

Hi @ghstahl , thanks for reaching this poc repo.

For what I have checked, it seems we don't have official plugin for mongodb to input to elastic. And by refering to the plugin I used, I found this: https://github.com/phutchins/logstash-input-mongodb/pull/60

Seems like the plugin issue like you said. Please let me know if you find better plugin for solving the issue if any