logstash-plugins / logstash-input-file

Apache License 2.0
68 stars 101 forks source link

Fix ReadFile handler to consider the value stored in sincedb on plugin restart #307

Closed andsel closed 2 years ago

andsel commented 2 years ago

Release notes

Fixes read mode when sincedb already stores a reference for the file not completely consumed.

What does this PR do?

Update the file pointer of a read mode file to the max between the read bytes or the sincedb reference for the same file. This solves a problem, that when a pipeline is restarted, it's able to recover from the last known reference, without restarting from the beginning, and reprocessing already processed lines.

Why is it important/What is the impact to the user?

When a pipeline with file input in read mode is restarted, this let the plugin to recover from where it left if that information is present in the sincedb store.

Checklist

Author's Checklist

Pipeline definition:

- pipeline.id: SDH_650
  pipeline.workers: 1
  pipeline.batch.size: 5
  config.string: |
    input {
        file {
            path => "/home/andrea/workspace/logstash_configs/file_input_sdh650/sample_fixture.csv"
            sincedb_path => "/home/andrea/workspace/logstash_configs/file_input_sdh650/sincedb"
            mode  => "read"
            start_position => "beginning"
        }
    }

    filter {
        csv {
            separator => ","
            columns => ["id", "host", "fqdn", "IP", "mac", "role", "type", "make", "model", "oid", "fid", "time"]
            remove_field => ["path", "host", "message", "@version" ]   
        }
        sleep {
            time => 1
            every => 10
        }
    }

    output {
        elasticsearch { 
            index => "650" 
            hosts => "http://localhost:9200"
            user => "elastic"
            password => "changeme"
        }
        stdout { codec => dots }
    }

Some curls to configure the ES output index and an aggregation query to verify:

PUT /650
{
  "mappings": {
    "properties": {
      "id":    { "type": "keyword" },  
      "host":  { "type": "text"  }, 
      "fqdn":   { "type": "text"  },
      "IP":   { "type": "text"  },
      "mac":   { "type": "text"  },
      "role":   { "type": "keyword"  },
      "type":   { "type": "keyword"  },
      "make":   { "type": "text"  },
      "model":   { "type": "text"  },
      "oid":   { "type": "text"  },
      "fid":   { "type": "text"  },
      "time":   { "type": "text"  }
    }
  }
}
DELETE 650

GET 650/_search
{
  "aggs": {
    "types": {
      "terms": { "field": "type" }
    }
  }
}

The expectation is to have 2 buckets, equally sized. Without the fix a bucket contains more documents, which means some rows was reprocessed on a pipeline reload.

How to test this PR locally

Follow step steps in #290

Related issues

Use cases

Screenshots

Logs

andsel commented 2 years ago

Hold on this PR till the CI is back to green on main, then rebase and ask for review again.