pinojs / pino

🌲 super fast, all natural json logger
http://getpino.io
MIT License
14.21k stars 874 forks source link

Transport worker thread is closed before stream ends #1338

Closed ikovac closed 2 years ago

ikovac commented 2 years ago

Hello 👋 , I am trying to make custom cloudwatch transport and this is my code:

cloudwatch-transport.cjs

const { Writable } = require('stream');
const build = require('pino-abstract-transport');
const AWS = require('aws-sdk');

AWS.config.update({
  accessKeyId: 'key',
  secretAccessKey: 'secret',
  region: 'us-east-1'
});

class CloudWatchStream extends Writable {
  #logGroupName;
  #logStreamName;
  #sequenceToken;
  #cloudWatchLogs;

  constructor(options) {
    super();
    this.#logGroupName = options.logGroupName;
    this.#logStreamName = options.logStreamName;    
    this.#sequenceToken = null;
    this.#cloudWatchLogs = new AWS.CloudWatchLogs();
  }

  _write(chunk, encoding, next) {
    console.log('Msg: ', chunk.toString());
     console.log('HERE 1!!!');
    this.#cloudWatchLogs.createLogGroup({ logGroupName: this.#logGroupName }, (err, data) => {
      console.log('HERE 2!!!');
      if (err) console.log('err: ', err);
      next();
    }
  }
}

const createTransport = () => {
  return build(source => {
    const stream = new CloudWatchStream({ logGroupName: 'test', logStreamName: 'stream-1' });
    source.pipe(stream);
    return stream;
  }, { parse: 'lines' });
};

module.exports = createTransport;
module.exports.default = createTransport;

index.js

const pino = require('pino');

function createLogger() {
  const transport = pino.transport({
    targets: [{
      target: './custom-transport.cjs'
    }]
  })
  const logger = pino(transport);
  return logger.child({
    ver: process.env.npm_package_version,
    env: process.env.NODE_ENV
  });
}

const logger = createLogger();
logger.info('Hello');

But worker thread is closed before _write method next callback gets called. HERE 1!!! gets printed, but HERE 2!!! doesn't.

If I put a log statement inside setInterval it works because the worker thread is not closed:

setInterval(() => {
  logger.info('Hello');
}, 3000);

Am I doing something wrong? Thanks in advance! 🙌

mcollina commented 2 years ago

You need to close/wait for your stream to close by passing a close() function to https://github.com/pinojs/pino-abstract-transport#buildfn-opts--stream.

ikovac commented 2 years ago

Thank you for your answer! If I understood right, I need to emit an event before calling the next callback like this:

this.#cloudWatchLogs.createLogGroup({ logGroupName: this.#logGroupName }, (err, data) => {
      console.log('HERE 2!!!');
      if (err) console.log('err: ', err);
      this.emit('close');
      next();
  }

and then pass close function to the build options like this:

close(err, cb) {
      stream.on('close', () => {
        console.log('Stream is closed');
        cb();
      })
    }

Right?

ikovac commented 2 years ago

I did this, and it seems to work now, but it doesn't print the Stream is closed message. It looks like strem.on('close') never get called.

mcollina commented 2 years ago

That's expected. console.log() in worker threads is asynchronous and go into the main thread event loop. However in your example the main event loop has already shut down, so you can't log anything. You can use process._rawDebug() to actually print something.

ikovac commented 2 years ago

Thank you soo much! 🙇

mcollina commented 2 years ago

Are you going to publish this in a module?

ikovac commented 2 years ago

Yeah, I can do that as soon I finish it. I will post a link here in the comment 😉

mcollina commented 2 years ago

Send a PR, we list known transports in the docs.

github-actions[bot] commented 2 years ago

This issue has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.