pinojs / pino-elasticsearch

🌲 load pino logs into Elasticsearch
MIT License
179 stars 67 forks source link

Elasticsearch bulk helper closing splitter stream #140

Open LuckyWraptor opened 2 years ago

LuckyWraptor commented 2 years ago

Currently we're experiencing issues of logging completely stopping completely after our elastic ingest node is rebooting. After 2 days of debugging and testing I've concluded the splitter transform stream is getting destroyed by the async iterator in the bulk helper of the elasticsearch-jslibrary.

So the cause: pino-elasticsearch uses the bulk helper from elasticsearch-js, once the maxRetries are exceeded and all nodes are declared as DEAD, the stream will be disposed because of the bulk helper's for await on the splitter.

I've been trying to create a patch PR, but my knowledge of JS streams is next to none. This splitter's Transport stream is being used as the main stream for pino so once that one's disposed the logs will cease to continue even if the nodes come back up

mcollina commented 2 years ago

@delvedor could you take a look?

mcollina commented 2 years ago

What should be the default behavior in this case? Are you ok in dropping logs? Or should this be kept in memory?

LuckyWraptor commented 2 years ago

The bulk request should just drop the logs in my opinion, as that request has exceeded it's max retry count.

The issue is that once all nodes in the connection pool are declared DEAD the stream will be disposed/destroyed and pino-elasticsearch will not continue, ever.

I've found a hacky solution to this problem: Override the splitter.destroy and force resurrection on the client connection pool, afterwards reinitialize the bulk request and keep doing this every time the stream wants to be disposed. This way no changes to the original elasticsearch js library have to be made.

I'll fetch my laptop and try to make a PR so you can see what I mean

delvedor commented 2 years ago

Hello! Even if all the nodes are declared dead, the client will still try to send requests against the originally configured list of nodes.

the stream will be disposed because of the bulk helper's for await on the splitter.

I'm not sure what you mean here. Is the helper throwing any error?

The bulk request should just drop the logs in my opinion, as that request has exceeded it's max retry count.

If the maxRetries is reached, the onDrop callback will be called with the dropped log. It's up to the user to decide what to do here. In this case, the library is emitting an event named insertError via the stream, see here.

The bulk helper continues to work as long as the input stream does not error (meaning the error event) or if one of the internals badly fails.

LuckyWraptor commented 2 years ago

Thank you @delvedor but in my case that's not what's happening really.

The splitterstream disposes after a document is dropped and the max retries are done plus all nodes being down. Once an endpoint becomes available again, the splitter is disposed and no following streams will occur

LuckyWraptor commented 2 years ago

I've rewritten the lib.js file to suit our needs. This currently works, when the splitter.destroy gets called I resurrect the pool and reinitialize the client bulk request. Setting autodestroy to false did not prevent the splitter from being destroyed.

'use strict'

/* eslint no-prototype-builtins: 0 */

const split = require('split2')
const { Client, Connection } = require('@elastic/elasticsearch')

function initializeBulkHandler(opts, client, splitter) {
  const esVersion = Number(opts['es-version']) || 7
  const index = opts.index || 'pino'
  const buildIndexName = typeof index === 'function' ? index : null
  const type = esVersion >= 7 ? undefined : (opts.type || 'log')
  const opType = esVersion >= 7 ? opts.op_type : undefined

  const b = client.helpers.bulk({
    datasource: splitter,
    flushBytes: opts['flush-bytes'] || 1000,
    flushInterval: opts['flush-interval'] || 30000,
    refreshOnCompletion: getIndexName(),
    onDocument (doc) {
      const date = doc.time || doc['@timestamp']
      if (opType === 'create') {
        doc['@timestamp'] = date
      }

      return {
        index: {
          _index: getIndexName(date),
          _type: type,
          op_type: opType
        }
      }
    },
    onDrop (doc) {
      const error = new Error('Dropped document')
      error.document = doc
      splitter.emit('insertError', error)
    }
  })

  b.then(
    (stats) => splitter.emit('insert', stats),
    (err) => splitter.emit('error', err)
  )

  // Reset the ondestroy
  splitter.destroy = function (err) {
    client.connectionPool.resurrect({ name: 'elasticsearch-js', requestId: '696969' })
    initializeBulkHandler(opts, client, splitter)
  }

  function getIndexName (time = new Date().toISOString()) {
    if (buildIndexName) {
      return buildIndexName(time)
    }
    return index.replace('%{DATE}', time.substring(0, 10))
  }
}

function pinoElasticSearch (opts) {
  if (opts['bulk-size']) {
    process.emitWarning('The "bulk-size" option has been deprecated, "flush-bytes" instead')
    delete opts['bulk-size']
  }

  const splitter = split(function (line) {
    let value

    try {
      value = JSON.parse(line)
    } catch (error) {
      this.emit('unknown', line, error)
      return
    }

    if (typeof value === 'boolean') {
      this.emit('unknown', line, 'Boolean value ignored')
      return
    }
    if (value === null) {
      this.emit('unknown', line, 'Null value ignored')
      return
    }
    if (typeof value !== 'object') {
      value = {
        data: value,
        time: setDateTimeString(value)
      }
    } else {
      if (value['@timestamp'] === undefined) {
        value.time = setDateTimeString(value)
      }
    }

    function setDateTimeString (value) {
      if (typeof value === 'object' && value.hasOwnProperty('time')) {
        if (
          (typeof value.time === 'string' && value.time.length) ||
          (typeof value.time === 'number' && value.time >= 0)
        ) {
          return new Date(value.time).toISOString()
        }
      }
      return new Date().toISOString()
    }
    return value
  }, { autoDestroy: true })

  const client = new Client({
    node: opts.node,
    auth: opts.auth,
    cloud: opts.cloud,
    ssl: { rejectUnauthorized: opts.rejectUnauthorized },
    Connection: opts.Connection || Connection
  })

  initializeBulkHandler(opts, client, splitter)

  return splitter
}

module.exports = pinoElasticSearch
LuckyWraptor commented 2 years ago

I must add I am only using a single node in this setup. So maybe this is only occurring when the pool only has 1 single endpoint