Open xCodeMuse opened 9 months ago
Ouch, this is pretty bad. Can you give us a way to reproduce?
Not at the moment. These are production logs and coming randomly. Could they be because of a large size of log being flushed? We checked thread-stream/index.js:465
Saw it has got something to do with an (unexpectedly) negative offset within the buffer at the time of flushSync() call.
Our process was getting killed from the uncaught exception arising from this. As a workaround, for now we changed the default pino functions like error, info etc, by creating a wrapper function which adds a try catch block around calling the original pino functions. We did this for pino.{method} and pino.child.{method} both.
Could they be because of a large size of log being flushed?
It's unlikely on its own. We have battle tested this in the last few years, and this never got reported despite massive logs.
What's your complete setup?
setup
import Pino from 'pino';
import pinoDebug from 'pino-debug';
import config from 'config';
const configRedact = (config as any).redact || [];
let redactAttrs: Array<string> = [];
for (const redactAttr of configRedact) {
if (redactAttr.match(/^\*\*/)) {
const fieldName = redactAttr.replace(/^\*\*\./, '');
redactAttrs.push(`${fieldName}`, `*.${fieldName}`, `*.*.${fieldName}`, `*.*.*.${fieldName}`, `*.*.*.*.${fieldName}`, `*.*.*.*.*.${fieldName}`,
`*.*.*.*.*.*.${fieldName}`, `*.*.*.*.*.*.*.${fieldName}`, `*.*.*.*.*.*.*.*.${fieldName}`, `*.*.*.*.*.*.*.*.*.${fieldName}`
);
} else {
redactAttrs.push(redactAttr);
}
}
const logger: Pino.Logger = Pino({
level: (config as any).log_level || 'debug',
transport: {
target: '../pino/pino-opentelemetry-transport',
options: {
destination: 1,
Resource: { 'service.name': process.env.OTEL_SERVICE_NAME || 'unknown_service:node',
env: process.env.NODE_ENV }
}
},
redact: {
paths: redactAttrs,
censor: '*****'
}
});
pinoDebug(logger, {
auto: true, // default
map: {
'express:router': 'debug',
'*': 'trace' // everything else - trace
}
});
export { logger };
This is how we export logger and use it throughout in our service
eg:
logger.info('hello world')
What’s pino-opentelemetry-transport?
Here is the pino-open-telemetry code as asked by you . The process is running with NODE_ENV = prod ccing @xCodeMuse
'use strict';
const build = require('pino-abstract-transport');
const { SonicBoom } = require('sonic-boom');
const { once } = require('events');
const DEFAULT_MESSAGE_KEY = 'msg';
const ZEROS_FROM_MILLI_TO_NANO = '0'.repeat(6);
/**
* @typedef {Object} CommonBindings
* @property {string=} msg
* @property {number=} level
* @property {number=} time
* @property {string=} hostname
* @property {number=} pid
*
* @typedef {Record<string, string | number | Object> & CommonBindings} Bindings
*
*/
/**
* Pino OpenTelemetry transport
*
* Maps Pino log entries to OpenTelemetry Data model
*
* @typedef {Object} Options
* @property {string | number} destination
* @property {string} [messageKey="msg"]
*
* @param {Options} opts
*/
module.exports = async function (opts) {
const destination = new SonicBoom({ dest: opts.destination, sync: false });
const mapperOptions = {
messageKey: opts.messageKey || DEFAULT_MESSAGE_KEY
};
const resourceOptions = {
...opts.Resource
};
return build(async function (/** @type { AsyncIterable<Bindings> } */ source) {
for await (const obj of source) {
const line = toOpenTelemetry(obj, mapperOptions, resourceOptions);
let updatedLine;
if (process.env.NODE_ENV != 'dev') {
updatedLine = JSON.stringify(line) + '\n';
} else {
let timestamp = parseInt(line.Timestamp.replace(ZEROS_FROM_MILLI_TO_NANO, ''));
let date = new Date(timestamp);
let dateString = new Intl.DateTimeFormat('en-IN', { dateStyle: 'short', timeStyle: 'medium',timeZone: 'Asia/Kolkata' }).format(date);
updatedLine = `${dateString} [${line.SeverityText}] ${line.TraceId ?? ''} ${line.SpanId ?? ''} ${JSON.stringify(line.Attributes)} ${line.Body}\n`;
}
const writeResult = destination.write(updatedLine);
const toDrain = !writeResult;
// This block will handle backpressure
if (toDrain) {
await once(destination, 'drain');
}
}
}, {
async close () {
destination.end();
await once(destination, 'close');
}
});
};
const FATAL_SEVERITY_NUMBER = 21;
/**
* If the source format has only a single severity that matches the meaning of the range
* then it is recommended to assign that severity the smallest value of the range.
* https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#mapping-of-severitynumber
*/
const SEVERITY_NUMBER_MAP = {
10: 1,
20: 5,
30: 9,
40: 13,
50: 17,
60: FATAL_SEVERITY_NUMBER
};
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/data-model.md#displaying-severity
const SEVERITY_NAME_MAP = {
1: 'TRACE',
2: 'TRACE2',
3: 'TRACE3',
4: 'TRACE4',
5: 'DEBUG',
6: 'DEBUG2',
7: 'DEBUG3',
8: 'DEBUG4',
9: 'INFO',
10: 'INFO2',
11: 'INFO3',
12: 'INFO4',
13: 'WARN',
14: 'WARN2',
15: 'WARN3',
16: 'WARN4',
17: 'ERROR',
18: 'ERROR2',
19: 'ERROR3',
20: 'ERROR4',
21: 'FATAL',
22: 'FATAL2',
23: 'FATAL3',
24: 'FATAL4'
};
/**
* Converts a pino log object to an OpenTelemetry log object.
*
* @typedef {Object} OpenTelemetryLogData
* @property {string=} SeverityText
* @property {string=} SeverityNumber
* @property {string} Timestamp
* @property {string} Body
* @property {{ 'host.hostname': string, 'process.pid': number }} Resource
* @property {Record<string, any>} Attributes
*
* @typedef {Object} MapperOptions
* @property {string} messageKey
*
* @param {Bindings} sourceObject
* @param {MapperOptions} mapperOptions
* @returns {OpenTelemetryLogData}
*/
function toOpenTelemetry (sourceObject, { messageKey }, resourceOptions) {
const { time, level, hostname, pid, trace_id, span_id, trace_flags, [messageKey]: msg, ...attributes } = sourceObject;
const severityNumber = SEVERITY_NUMBER_MAP[sourceObject.level];
const severityText = SEVERITY_NAME_MAP[severityNumber];
return {
Body: msg,
Timestamp: time + ZEROS_FROM_MILLI_TO_NANO,
SeverityNumber: severityNumber,
SeverityText: severityText,
TraceId: trace_id,
SpanId: span_id,
TraceFlags: trace_flags,
Resource: {
...resourceOptions,
'host.hostname': hostname,
'process.pid': pid
},
Attributes: attributes
};
}
Let us know if this happen again, and if you can reproduce reliably.
Unfortunately, I faced the same issue (Error: overwritten
). I could not reproduce it reliably but noticed that it mostly happens under high load (I'm using fastify + pino + pino custom transport).
I was also able to reproduce it locally a couple of times when shooting via autocannon, but I also had no luck localizing the problem.
My solution was to just stop using transport and use node's Writable
for streaming to another destination in the same thread.
Just sharing my experience and hopefully this will be helpful to people facing or debugging this
Randomly facing an issue
Log:
Error: overwritten\n at writeSync (/workspace/development/app/node_modules/thread-stream/index.js:465:13)\n at ThreadStream.write (/workspace/development/app/node_modules/thread-stream/index.js:240:9)\n at Pino.write (/workspace/development/app/node_modules/pino/lib/proto.js:215:10)\n at Pino.LOG [as info] (/workspace/development/app/node_modules/pino/lib/tools.js:62:21)\n
Not really sure why Pino is throwing this error.
Has anyone faced similar issue?