sematext / logagent-js

Extensible log shipper with input/output plugins, buffering, parsing, data masking, and small memory/CPU footprint
https://sematext.com/logagent
Apache License 2.0
389 stars 79 forks source link

Implement Elasticsearch Index API #52

Closed otisg closed 6 years ago

otisg commented 7 years ago

So one can easily feed Logagent from other data sources that can output to Elasticsearch.

fbalicchia commented 7 years ago

Hi,

Could you please explain better what you mean ? From reference issue what I've understand is that you prefer use ES bulk API instead of implement lumberjack-protocol server (input)

I'm I in wrong ? IMHO can be useful to have lumberjack protocol input cause in this case we can forward traffic arrive from beat agent WDYT ?

megastef commented 7 years ago

We thought of implementing Elasticsearch bulk API as input (all "beats" support Elasticsearch bulk indexing): https://www.elastic.co/guide/en/elasticsearch/reference/5.5/docs-bulk.html

Logagent could be a central service to get logs from N servers, before it forwards logs to Sematext Cloud or any other Elasticsearch. The disk-buffer would make Logagent a reliable mediator and log parser for simple log collectors like FileBeat or rsyslog, which are deployed to edge devices.

fbalicchia commented 7 years ago

Hi, I started to work on this issue and I've complete input part. Please see input My idea is to, after some check on input side like NDJSON ecc.ecc, emit message body and forward to output side . I'd like to avoid use esclient.js, even though I saw that there is already dependency, in favor of Logsene endpoint but I have not yet seen whether it is possible to do such a thing

var client = new elasticsearch.Client({ host: 'localhost:9090', log: 'debug' });

client.bulk(data.msg);

WDYT ?

megastef commented 7 years ago

Hi @fbalicchia - I think you are on the wrong track.

logagent-input-elasticsearch-http should behave like an Elasticsearch server for bulk indexing. So tools like FileBeat could ship data to Logagent, where the smart things happen ;)

This means:

  1. Start http server on default port 9200
  2. Handle post requests to '/_bulk'
  3. Read the body, which has 2 lines per document. The first one is an instruction like "index" (or delete/update) with meta data like index name and type. The second line is the actual document, which needs to be indexed. Here is an example for 2 documents:
    { "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
    { "field1" : "value1" }
    { "index" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
    { "field1" : "value2" }

    We would support only index operation for the beginning (no delete/update)

  4. Emit objects with this structure to Logagent eventEmitter: { "field1" : "value1", "_index" : "test", "_type" : "type1", "_id" : "1" } Create a context object with {source: 'input-elasticsearch-http', index=test} attached to the generated event (ES ouput plugin uses context.index field).
  5. Generate the HTTP response for Elasticsearch clients - this must only match the expected format. We can't actually say that creation will be successful for the whole processing chain (this would require tracking status and output filters might drop some docs - so I'm not sure if it makes sense to care about it). Therefore we have to create a "fake" to acknowledge the reception of the documents.
    "took": 30,
    "errors": false,
    "items": [
      {
         "index": {
            "_index": "test",
            "_type": "type1",
            "_id": "1",
            "_version": 1,
            "result": "created",
            "_shards": {
               "total": 2,
               "successful": 1,
               "failed": 0
            },
            "created": true,
            "status": 201
         }
      }
    ]

Elasticsearch output plugin (or others) could interpret the _index and _type fields (and they do it already as far I remember, if not we can adjust it ...).

Once done this test should work:

$ logagent --config elasticsearch-input.http.yml 
$ cat requests
{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }
$ curl -s -H "Content-Type: application/x-ndjson" -XPOST localhost:9200/_bulk --data-binary "@requests"; echo
{"took":7, "errors": false, "items":[{"index":{"_index":"test","_type":"type1","_id":"1","_version":1,"result":"created","forced_refresh":false}}]}
megastef commented 6 years ago

Closing. Done see https://sematext.com/docs/logagent/input-plugin-elasticsearch-http/