sematext / logagent-js

Extensible log shipper with input/output plugins, buffering, parsing, data masking, and small memory/CPU footprint
https://sematext.com/logagent
Apache License 2.0
389 stars 79 forks source link

Aggregate or down-sample events #40

Open erik-stephens opened 8 years ago

erik-stephens commented 8 years ago

In order to relieve pressure downstream and to avoid collation complexity, would be nice if I could aggregate logs within logagent-js. I'll try to explain more with an example. I would like logs such as:

{message: 'foo happened', field1: 'type1', metric1: 42, metric2: 1}
{message: 'foo happened', field1: 'type2', metric1: 24, metric2: 2}
{message: 'foo happened', field1: 'type1', metric1: 40, metric2: 3}

to be user-definably aggregated with something like the following before shipping them:

function aggregateFoo(pattern, buffer) {
  if (buffer.foo === undefined) {
    buffer.foo = {};
  }
  if (buffer.foo[pattern.field1] == undefined) {
    buffer.foo[pattern.field1] = {metric1: [], metric2: [], first: now()};
  }
  var agg = buffer.foo[pattern.key];
  agg.metric1.push(pattern.metric1);
  agg.metric2.push(pattern.metric2);
  agg.last = now();
  // Could aggregate in N-minute minimum buckets
  if (agg.last - agg.first > '5 minutes') {
    shipFoo(agg);
  }
  // Could aggregate once N samples collected
  if (agg.metric1.length > 100) {
    shipFoo(agg);
  }
  delete buffer.foo;
}

function shipFoo(agg) {
  // Demonstrating how users may want to define the aggregated event.
  var event = {};
  event.message = 'foo happened ' + agg.metric1.length + ' times';
  event.ts = agg.last;
  event.window = agg.last - first;
  event.metric1 = avg(agg.metric1);
  event.metric2 = max(agg.metric1);
  ship(event);
}

Is this already on the road map, and if not, is it in the scope of what you would like for logagent-js? I may have the green light to hack my way through this. If could be generally useful to others, let's discuss further here on how best to approach?

megastef commented 8 years ago

+1 nice!

otisg commented 8 years ago

Looks useful at first glance. One may want to know the exact time window from which these aggregations were derived (so 'from' and 'to' timestamps may need to be tracked and simply shipped as fields).

Logagent now has support for pluggable inputs and outputs. Would this be an output, or would it be better to be able to allow pluggable "processors" (the stuff between inputs and outputs)?

megastef commented 8 years ago

My questions:

  1. Is this a parser feature to aggregate messages? Then it would work for all tools using the parser (including Docker Agent). No need to change the current workflow.
  2. Is this a logs shipper feature, then it needs to be a pluggable "processor". Currently we have only input and output plugins. Inputs emit "data.raw" events, LA does paring and emits then "data.parsed" events. Output plugins listen to "data.parsed". A "filter/processor plugin" needs to run before "data.parsed" is emitted -> little change in the framework.
erik-stephens commented 8 years ago

@otisg, I think the stuff between. Plus, there's state to manage, which probably makes this a bit more complicated than an input or output. Regarding which fields to ship, my hope is that could be completely defined by the user since I expect many different use cases.

@megastef, not sure I know enough to answer. Now that #22 is complete, I think I can define my filter() to drop the event if my transform() marked it as aggregated. I think all I may need is some temporary buffer space to store the relevant bits of the event. Does it make sense to use the pattern object for that or would that be too kludgy? My transform() would need access to it in order to return the aggregated event as desired (eg time, count). That would be my naive way of going about it.

megastef commented 8 years ago

Well using #22 is something which would work in the "user land". Setting pattern.aggBuffer or pattern.aggTimer. The pattern object gets only destroyed when the patterns.yml changes (hot-reload).

I think we need filter plugins in any case and it would be the best place for aggregation, before data is passed to output plugins. A simple default could just load a JS function (inline in config) or from a JS file/module.

megastef commented 8 years ago

@erik-stephens I think this is what are looking for. https://www.npmjs.com/package/logagent-filter-output-sql. This outputFilter plugin provides SQL based queries to filter or aggregate log events (already parsed by patterns applied before ...).

erik-stephens commented 8 years ago

Very cool! Quickly scanned the source of that plugin. The buffer is flushed based on configured interval? So if I set interval to 300, it will buffer all events for past 5 minutes and flush the buffer? That should work nicely for our needs.

If haven't thought about it yet, might want to consider what is expected behavior if user feeds it a bad query. If it's possible for the query to raise an exception, then buffer might blow up. Also, looks like buffer will continue to grow if query returns no results. I would expect buffer to be flushed in that case as well.

megastef commented 8 years ago

Yes, the interval is configurable (value in seconds). Please note that the buffer keeps all records for this time in memory (e.g. 5 minutes in your example). All SQL operations are currently in-memory and the required memory for logagent grows with the number of buffered events until the SQL queries are executed.

Fixed error handling https://github.com/megastef/logagent-filter-output-sql/commit/ce9b45f8ee0d20848547c9083a610d60c47b943b

Thanks for the great feedback.

erik-stephens commented 7 years ago

Apologies if I'm mis-reading the code, but I think makes sense to still emit data.parsed with an empty list when query returns no results - looking at the return if (result && result.length === 0) bit. As a user of this functionality, I may want to emit that aggregation as a zero event. Plus, looks like buffer could still grow unbounded if my query never matches any events. Over all I think this is great, so feel free to close this issue if happy with it - don't want you to feel like have to wait for my "blessing".

erik-stephens commented 7 years ago

Tested the sql output plugin. One bit I'm missing is how to process the results of the plugin - still needing some user-defined javascript to fill the gaps that alasql leave for our use cases. I'd like it emitted back up the stream somewhere besides where emit('data.parsed') takes it. I don't think emit('data.raw') makes sense all the time either which has me wondering if there are plans for letting users configure the processing pipeline. I'm wondering if a distinction needs to be made between input filters, processors, and output filters as mentioned in #43 and instead let processors subscribe to msgs and publish them on specific channels. I'm using the term "channel" but I think as simple as the event name to subscribe (on()) & publish (emit()) to. Spit-balling my ideal setup:

If it's as simple as I think it is, I could come up with a PR. Wanted to gauge where y'all are on this before diving in. Maybe have other plans or see problems with this approach?

megastef commented 7 years ago

@erik-stephens This is a very good suggestion. Please tell us about:

One bit I'm missing is how to process the results of the plugin - still needing some user-defined javascript to fill the gaps that alasql leave for our use cases.

We have not documented latest Elasticsearch output features like routing logs to different servers/indicies.

What you describe above is exactly the initial idea we had for "Event based architecture". Down the road it was was not so easy or is not completed:

We are open for good suggestion or PR and we like to improve the usability and reduce JS programming parts for the end users.

Just to complete the current status, we can have workflows like this:

Input File
  - watch files by GLOB patterns
  - emit 'data.raw' new lines from each changed file with a context having source name
  -> Apply N Input Filter (e.g. Grep filter)
    - match source name from 'context'
    - drop | emit depending on 'data' or source
      -> Parser 
         - match patterns by sourceName from context
         - match regex / parse structure
         -> Apply N  Output Filter (e.g SQL filter)
            - transform, aggregate, 
            - drop/emit 'data.parsed'
            -> N Output Plugins (e.g. Elasticsearch)
               - listen to 'data.parsed'
               - route to Elasticsearch URL and map source name to URL/indices
                 - match source name by regex 
                 - map to index name (config)

An example config might look like this (sorry it might make much sense, but it outlines the options):

input:
  files:
      - '/var/log/**/*.log'
      - '/opt/mylogs/*.log'

inputFilter:
  - module: grep
    config:
      matchSource: !!js/regexp /nginx/
      include: !!js/regexp /.*/i
      exclude: !!js/regexp /OPTIONS/i

outputFilter:
  - module: sql
    config:
      source: !!js/regexp /access.log|httpd|nginx/
      interval: 1
      queries:
        - # calculate average page size for different HTTP methods
          SELECT 'apache_stats' AS _type, 
                  AVG(size) AS size_avg, 
                  COUNT(method) AS method_count, 
                  method as http_method
          FROM ? 
          WHERE method is not null
          GROUP BY method

output:  
  es-secure-local:
    module: elasticsearch 
    url: http://localhost:9200
    httpOptions:
      key: ./client.key
      cert: ./server.crt
      rejectUnauthorized: false
    indices: 
      my_secure_logs_index: 
        - system\.log
        - access\.log
        - auth\.log
        - wifi.log
  logsene-saas:
    module: elasticsearch
    url: https://logsene-receiver.sematext.com
    indices: 
      269d7b2f-0e7d-4e92-ae7f-xxxxxxxxxx: 
        - access.log
        - nginx.log
erik-stephens commented 7 years ago

One bit I'm missing is how to process the results of the plugin - still needing some user-defined javascript to fill the gaps that alasql leave for our use cases.

I'm trying to consume the systemd journal. I would like to index all msgs for specific scheduled jobs as one document in elasticsearch. That's where the sql output filter helps quite a bit but there are some things that are awkward or impossible to do in sql alone:

SELECT
  'foo' as `process`,
  ARRAY(MESSAGE) as `message`,
  ARRAY(DISTINCT _HOSTNAME) as `hostname`,
  `end` - `start` as duration,
  MIN(_SOURCE_REALTIME_TIMESTAMP) as `start`,
  MAX(_SOURCE_REALTIME_TIMESTAMP) as `end`,
  (MAX(_SOURCE_REALTIME_TIMESTAMP) - MIN(_SOURCE_REALTIME_TIMESTAMP)) / 1000000 as `duration`
FROM ? WHERE
  _SYSTEMD_UNIT = "foo.service"

On a related note, I would also like to index the other journal msgs that I'm not aggregating. Looks like I have to run logagent multiple times: once to feed it msgs that I want aggregated and once to feed the other msgs. What I'm hoping for is logagent to provide primitives for working with the msg bus/pipeline and let me tap into it various ways.

Using the "event channels" will create more events than we need - 100k logs might generate 600k or more events at least this was one of my considerations for resource foot print

I'm seeing that as user/configuration dependent. If I want a more complicated processing pipeline, then I would expect a performance tax.

Input/Output Filters are a bit different than input/output plugins. Filters (including transforms) must be "serialized" and we decided for generator functions called in "serial" way. Connecting multiple filters will need configuration for N listen and emit event names - at this point my opinion was it would be too complicated for regular users to understand this.

I was actually having a difficult time understanding the pipeline as it is now. Your current workflow description helps, though. I would think having the workflow more user-defined would make it easier for users to understand, but just my opinion.

I'll take a crack at a PR. If goes no where, then no problem. Will be a good learning exercise.

megastef commented 7 years ago

Documentation is one of the tasks we are working on - including explanation of the pipeline. Input/Output plugins are event driven, while input filters are hooked before parsing, and output filters after parsing. Then the output plugins listen to the "data.parsed" events. I made a picture, but too complicated to share :)

On a related note, I would also like to index the other journal msgs that I'm not aggregating.

You could use multiple SQL statements in the "queries" section, and use multipe "outputFilter: module: sql".

BTW, for journalctl/json input you could use JSON processing in patterns.yml (before output filter), maybe a helpful example.

json:
  removeFields:
    - __CURSOR
    - __MONOTONIC_TIMESTAMP
    - _TRANSPORT
    - JOURNAL_NAME
    - JOURNAL_PATH
    - CURRENT_USE
    - CURRENT_USE_PRETTY
    - MAX_USE
    - MAX_USE_PRETTY
    - DISK_KEEP_FREE
    - DISK_KEEP_FREE_PRETTY
    - DISK_AVAILABLE_PRETTY
    - DISK_AVAILABLE
    - LIMIT
    - LIMIT_PRETTY
    - AVAILABLE
    - AVAILABLE_PRETTY
    - _CAP_EFFECTIVE
    - _SYSTEMD_SLICE
  unitFilter: !!js/regexp /sematext.*/i
  transform: !!js/function >
    function (sourceName, parsedObject, config) {
       // filter for unit names
       if (! config.uniFilter.test(parsedObject['_SYSTEMD_UNIT'])) {
        parsedObject.logagentDropMessage = true
        return
      }
      parsedObject.logSource = parsedObject['_SYSTEMD_UNIT'].replace('.service','')
      parsedObject['@timestamp'] = new Date(Number(parsedObject['_SOURCE_REALTIME_TIMESTAMP']))
      for (var i=0; i<config.removeFields.length; i++) {
        // console.log('delete ' + config.removeFields[i])
        delete parsedObject[config.removeFields[i]]
      }
     }

And we have an open task for better journald support, while we have that in Sematext Docker Agent: https://github.com/sematext/sematext-agent-docker/blob/master/lib/tcpLogsene.js

Anyhow, you are welcome to make a PR to get your workflow done.

erik-stephens commented 7 years ago

Apologies for length of this thread and this post in particular. It's a lot to digest. I hacked away and have a version that I'm pretty happy with. It might be too radical a departure from your vision, so would like to gauge that before spending time getting it into a more PR-friendly form.

Everything is a stage in the pipeline - there is no distinction twix inputs, filters, processors, outputs other than how the user wires them up in the pipeline. Things I like about this approach:

Things I don't like or am not sure about:

To give you a sense of the implementation, logagent.js simply provides a name/id, config, and EventEmitter instance (called pipeline below) to the plugins. It doesn't do much else but listen for termination events. Most of the implementation gets pushed out to the plugins, even the stats and error handling could be defined as pipeline stages:

$ cat lib/plugins/js.js
'use strict'

function Plugin (name, config, pipeline) {
  var self = this
  self.name = name
  self.pipeline = pipeline
  self.config = config
  self.sandbox = {
    util: require('util'),
    moment: require('moment')
  }
  self.pipeline.on(self.config.inChannel, function (context, data) {
    try {
      context.start = new Date()
      context.stage = self.name
      context.dropped = true
      var copy = Object.assign({}, data)
      if (self.config.function.call(self.sandbox, copy)) {
        self.pipeline.emit(self.config.outChannel, {stage: self.name}, copy)
        context.dropped = false
      }
      context.end = new Date()
      self.pipeline.emit(self.config.statsChannel, context)
    }
    catch (err) {
      self.pipeline.emit(self.config.errChannel, err, context)
    }
  })
}
module.exports = Plugin

Example config defining a pipeline that I'm using to process journalctl -o json | logagent.js.:

pipeline:

  stdin:
    module: stdin

  lines:
    module: lines
    inChannel: stdin

  json-in:
    module: json-in
    inChannel: lines

  # Drop all fields except for the ones I'm interested in.
  clean-journal:
    module: keep
    inChannel: json-in
    # outChannel: clean-journal # not needed since defaults to stage name
    fields:
      hostname: ['_HOSTNAME', '_MACHINE_ID']
      severity: PRIORITY
      process: ['UNIT', '_SYSTEMD_UNIT', 'SYSLOG_IDENTIFIER']
      message: MESSAGE
      container: ['CONTAINER_NAME', 'CONTAINER_ID']
      ts: ['_SOURCE_REALTIME_TIMESTAMP', '__REALTIME_TIMESTAMP']

  # Split the "clean-journal" stream into two: aggregated by specific processes and not.
  unaggregated:
    module: js
    inChannel: clean-journal
    outChannel: cast
    function: !!js/function >-
      function(data) {
        return !data.process.match(/(foo|bar)/)
      }

  # I want output of these scheduled jobs to be indexed as one msg.
  aggregated:
    module: sql
    inChannel: clean-journal
    interval: 60 # seconds
    query: >-
      SELECT
        `process`,
        ARRAY(`message`) AS `message`,
        ARRAY(DISTINCT `hostname`) AS `hostname`,
        MIN(`ts`) AS `ts`,
        MAX(`ts`) AS `end`,
        (MAX(`ts`) - MIN(`ts`)) / 1000000 AS `duration`
      FROM ?
      WHERE
        `process` IN ("foo.service", "bar.service")
      GROUP BY `process`

  # Handle some transormations couldn't figure out how to do in sql.
  post-aggregation:
    module: js
    inChannel: aggregated
    outChannel: cast
    function: !!js/function >-
      function(data) {
        data.message = data.message.join('\n')
        data.hostname = data.hostname[0]
        return true
      }

  # A simple plugin to handle common task of casting to desired types.
  cast:
    module: cast
    inChannel: cast
    outChannel: output
    fields:
      severity: int
      ts: ts-usec
      end: ts-usec

  # Split the msg to multiple outputs: stdout & elasticsearch.
  json-out:
    module: json-out
    inChannel: output
    outChannel: stdout
  stdout:
    module: stdout
    inChannel: stdout

  elasticify:
    module: js
    inChannel: output
    outChannel: es
    function: !!js/function >-
      function(data) {
        if (data._type === undefined) {
          data._type = 'syslog'
          data.type = data._type
        }
        if (data._id === undefined) {
          // Specify a deterministic _id so can re-process logs without duplicates.
          data._id = this.util.format('%s::%s', data.hostname, data.ts.toISOString())
        }
        return true
      }
  elasticsearch:
    module: elasticsearch
    inChannel: es
    # Log to monthly index.
    index: !!js/function
      function (data) {
        return this.util.format('logs-%s', data.ts.format('YYYY.MM.DD')
      }
    api: '2.3'

  # Examples showing how stats & errors could be handled in user-defined way. By
  # default, a stage will emit('stats', {...}) and emit('errors', err, {...})
  # but those channels could be overridden on a per-stage basis.  Feeding
  # to an aggregator stage would be a nice marriage.
  stats:
    module: js
    inChannel: stats
    function: !!js/function >-
      function(context) {
        console.log('stats for stage %s:', context.stage, context)
      }

  errors:
    module: js
    inChannel: errors
    function: !!js/function >-
      function(err, context) {
        console.error('error in stage %s:', context.stage, err)
      }
otisg commented 7 years ago

Seems juicy, will digest (btw should we move this to #43?).

Performance: Have you compared before/after performance with ~equivalent configs to see if these modifications change CPU or memory footprint, and roughly how much? 15K EPS seems low, but maybe that's just because one of your stages is very expensive.

Configuration: With your approach can one:

Related to this - it seems like plugins (aka stages) are wired with inChannel/outChannel. Have you considered the approach where stages include just directives for what the stage should do, with something else that is external to the stage definition specifying how the stages are glued together into a specific flow? Some benefits of that:

Here's an idea/example. Not sure if this matches what you have in code and in mind:

this could be in file: config/stages-DEFAULT.conf the default stages config specifies built-in pipelines one can reuse/inherit

lines:
  module: lines
  source: /var/log/**/*.log

  stdin:
    module: stdin

  json-in:
    module: json-in
    source: stdin

stats:
  ...
errors:
  ...

aggregate:
  module: sql
  interval: 30
  query: SELECT ....

elasticsearch:
  module: elasticsearch
  index: ...
  ...

this could be in file: config/pipeline-FOO.conf define our own, additional or different stages for this pipeline and wire them up

lines:
  module: lines
  source: /foo/bar.log # read from this log file, not those from default config
  ...

aggregate: # same stage name as in default config means this overrides? OR do we treat this as distinct 'FOO.aggregate' stage? Maybe the former?
  module: sql
  interval: 30
  query: SELECT ....

- wire it up
pipeline: FOO
  lines | aggregate | elasticsearch   # inheriting elasticsearch module

this could be in file: config/pipeline-WHATEVER.conf another pipeline, separate from the one defined in pipeline-FOO.conf

json-in
  module: json-in
  ...

aggregate-baz: # we can explicitly use a different name for basically the same sort of stage defined in defaults
  module: sql
  interval: 30
  query: SELECT ....

patterns:
  module: patterns
  patternFile: /tmp/my-patterns.conf

elasticsearch:  # our own definition with my-custom-index
  module: elasticsearch
  index: my-custom-index
  ...

pipeline: BAR
  json-in | patterns | aggregate-baz | elasticsearch

this could be in file: config/pipeline-JUSTPIPELINEWIRING.conf

pipeline: NO_STAGES_JUST_WIRING_OF_DEFAULT_AVAILABLE_STAGES
  json-in | elasticsearch

this could be in file config/pipeline-PROBLEMATIC.conf ideally we'd have something detect invalid pipeline flows

pipeline: LOOPY
  json-in | json-in

What would Logagent do?

megastef commented 7 years ago

@erik-stephens could please share your changes? This could be on github or a mail with JS files, so I could play around with your version and see the impact to existing modules (re-use/change/re-write) or the command line interface and @otisg ideas.

otisg commented 7 years ago

@erik-stephens @megastef:

erik-stephens commented 7 years ago

I won't be able to respond fully until early next week, but wanted to respond quickly to your comments:

otisg commented 7 years ago

Running multiple Logagent instances for multiple pipelines doesn't sound great to me from the operational perspective:

One more question:

otisg commented 7 years ago

gentle nudge @erik-stephens. We are hoping to nail down the direction ASAP and get a proper release before Santa comes to town.

erik-stephens commented 7 years ago

Apologies for the delay. You can find a working copy here. I developed against a journalctl dump and am just now testing against a live system with journalctl being fed to it. I stomped on a lot of code (more to simplify for me) - I'm considering my version as something to cherry-pick from and nothing close to a PR.

I spent some time trying to get it to work with the stages de-coupled from the pipeline but it was turning out to be just as messy. Might still be worth considering, but for the sake of time, I abandoned that and instead added support for multiple input & output channels (see my ./config/example.yml).

As it is now, in-memory data will be lost on shutdown. I'm sure there are a handful of operational details like that would need some more attention before general use.

I haven't gotten to an apples-to-apples comparison about memory consumption. I don't even have file support in my version yet. I was seeing some concerning memory usage when feeding it my test file. It was a 1GB journal dump and it seemed to exhibit leaky behavior. I went down a bit of node.js memory mgmt rabbit hole and ended up adding a gc stage to allow for scheduled garbage collections but not sure how much it helps - I guess could be used as a blunt instrument / band-aid. I've had it reading a journal in either 10k-record or 1-minute chunks (which first) and memory is holding steady at around 60MB with the normal gc scheduling.

Next steps for me are:

If you guys end up trying this out and get stuck, let me know. I think it only supports running it with the -c CONFIG The relatively simple pipeline definition I'm working on had me stumped a few times. Funny how I rail against DSL's and kinda created one.

megastef commented 7 years ago

Thx @erik-stephens, I will have a look - and might need a few days to understand, review and test a few things. I see massive changes, a few quick thoughts:

megastef commented 7 years ago

Funny how I rail against DSL's and kinda created one.

I fully agree, I thought having "JS plugins" in every place, JS could be used to tricky tasks. Or making the config syntax easy for each plugin...

And the "original" aggregation use case with start/stop events could be solved in the current version with a filter-plugin, which drops the start events, buffers the start time in "config" object and emits a new event with start/end and duration once the stop event arrives.