openobserve / pino-openobserve

Load pino logs into Openobserve
Apache License 2.0
4 stars 2 forks source link

In pino transport configuration, target should be a string, not an object like in the example #2

Open PlkMarudny opened 7 months ago

PlkMarudny commented 7 months ago

From your docs, to use first you import:

import OpenobserveTransport from "@openobserve/pino-openobserve";

and later:

const logger = pino({
  level: 'info',
  transport: {
    target: OpenobserveTransport,
    options: {
      url: 'https://your-openobserve-server.com',
      organization: 'your-organization',
      streamName: 'your-stream',
      auth: {
        username: 'your-username',
        password: 'your-password',
      },
    },
  },
});

But this gives an error:

TypeError [ERR_INVALID_ARG_TYPE]: The "path" argument must be of type string. Received function OpenobserveTransport
thetutlage commented 3 months ago

The package doesn't seem to be compatible with Pino v7 transports, since the target must always be specified as a string. https://getpino.io/#/docs/transports

kvenn commented 1 week ago

Has anyone found a solution for this?

kvenn commented 1 week ago

In case anyone runs into this, this is working for me now. I hard-coded some stuff in there you're welcome to change (like the failure disabling) which I needed for my use case. I don't have the time at the moment to clean it up / PR it, but it's pretty much there.

const pinoLog: pino.Logger = pino({
        mixin(_context, level) {
            // For seeing the readable name of the log level (as opposed to
            // just the number). Can't use formatter with multiple transports
            return { 'level-label': pinoLog.levels.labels[level] }
        },
        transport: {
            targets: [
                // Pretty print to standard out
                {
                    target: 'pino-pretty',
                    options: {
                        colorize: true,
                        ignore: 'pid,hostname,level-label',
                    },
                    // Lowest level to send to the transport
                    level: 'trace',
                },
                // Send to openobserve on localhost
                // TODO: only do this if it's running (ping it) + start service if not
                // TODO: provide the `infra-source` (web/worker/etc)
                {
                    target: '../logger/pino-openobserve.ts',
                    options: {
                        url: 'http://127.0.0.1:5080',
                        organization: 'default',
                        streamName: 'default',
                        auth: {
                            username: 'admin',
                            password: 'admin',
                        },
                    },
                    // Lowest level to send to the transport
                    level: 'trace',
                },
            ],
        },
    })
import { Writable } from 'node:stream'
import build from 'pino-abstract-transport'
import { Transform } from 'stream'
import urlModule from 'url'

interface Options {
    url: string
    organization: string
    streamName: string
    auth: {
        username: string
        password: string
    }
    batchSize?: number
    timeThresholdMs?: number
}

interface DefaultOptions {
    batchSize: number
    timeThresholdMs: number
}

const debugLog = (...args: any[]) => {
    // eslint-disable-next-line @typescript-eslint/no-unsafe-argument
    if (process.env.LOG_OPENOBSERVE_DEBUG) console.log(...args)
}

// noinspection JSUnusedGlobalSymbols
/**
 * MUST export default async for pino to recognize the transport
 * This body was copied from:
 * https://github.com/openobserve/pino-openobserve/blob/main/index.js
 */
export default function (options: Options) {
    // Initialize the transport with asynchronous capabilities
    return build(async function (source) {
        debugLog('OpenObserve Pino: Transport initializing')

        // Default option values
        const defaultOptions: DefaultOptions = {
            batchSize: 100,
            // 5 second default, like datadog
            timeThresholdMs: 5 * 1000,
        }

        // Initialize log storage and utility variables
        const logs: any[] = []
        let timer: NodeJS.Timeout | null = null
        let apiCallInProgress = false
        let failures = 0
        let disableLogging = false

        // Merge provided options with default options
        const opts: Options & DefaultOptions = {
            ...defaultOptions,
            ...options,
        }

        // Validate necessary options are provided
        if (!opts.url || !opts.organization || !opts.streamName) {
            debugLog('OpenObserve Pino: Missing required')
            throw new Error(
                'OpenObserve Pino: Missing required options: url, organization, or streamName'
            )
        }

        // Generate the API URL for logging
        const apiUrl = createApiUrl(opts)

        // Create a writable stream to handle the log data
        const destination = new Writable({
            objectMode: true,
            write(log, _, callback) {
                if (disableLogging) {
                    callback()
                    return
                }
                debugLog('OpenObserve Pino: Log received:', log)
                logs.push(log)
                scheduleSendLogs(callback)
            },
        })

        // Use event-driven programming to handle source to destination piping
        await pipelineAsync(source, destination)
        debugLog('OpenObserve Pino: Pipeline completed')

        // Create a promise-based function to handle pipeline completion
        function pipelineAsync(
            source: Transform & build.OnUnknown,
            destination: Writable
        ): Promise<void> {
            debugLog('OpenObserve Pino: Piping source to destination')
            return new Promise((resolve, reject) => {
                source.pipe(destination).on('finish', resolve).on('error', reject)
            })
        }

        // Handle beforeExit to ensure all logs are sent
        process.on('beforeExit', () => {
            debugLog('OpenObserve Pino: Process beforeExit')
            if (logs.length > 0 && !apiCallInProgress) {
                sendLogs()
            }
        })

        // Function to construct API URL
        function createApiUrl({ url, organization, streamName }: Options): string {
            const parsedUrl = urlModule.parse(url)
            const path = parsedUrl.pathname?.endsWith('/')
                ? parsedUrl.pathname.slice(0, -1)
                : parsedUrl.pathname
            return `${parsedUrl.protocol}//${parsedUrl.host}${path}/api/${organization}/${streamName}/_multi`
        }

        // Schedule log sending based on batch size and time threshold
        function scheduleSendLogs(callback: () => void) {
            debugLog('OpenObserve Pino: Scheduling logs')
            if (timer) {
                clearTimeout(timer)
            }

            if (logs.length >= opts.batchSize && !apiCallInProgress) {
                sendLogs(callback)
            } else {
                timer = setTimeout(() => {
                    ;(async () => {
                        await sendLogs(callback)
                    })()
                }, opts.timeThresholdMs)
            }
        }

        // Send logs to API
        async function sendLogs(callback?: () => void) {
            debugLog('OpenObserve Pino: Sending logs:', logs.length)
            if (logs.length === 0 || apiCallInProgress) {
                return
            }

            apiCallInProgress = true
            const { auth } = opts
            const bulkLogs = logs
                .splice(0, opts.batchSize)
                .map((log) => JSON.stringify(log))
                .join(',')

            try {
                const response = await fetch(apiUrl, {
                    method: 'POST',
                    headers: {
                        Authorization: `Basic ${Buffer.from(
                            `${auth.username}:${auth.password}`
                        ).toString('base64')}`,
                        'Content-Type': 'application/json',
                    },
                    body: `${bulkLogs}`,
                })

                if (!response.ok) {
                    console.error(
                        'Failed to send logs:',
                        response.status,
                        response.statusText
                    )
                } else if (process.env.LOG_OPENOBSERVE_DEBUG) {
                    debugLog('Logs sent successfully:', await response.json())
                }
            } catch (error: any) {
                if (error.cause.code === 'ECONNREFUSED') {
                    failures++
                    if (failures > 2) {
                        disableLogging = true
                        console.warn(
                            'OpenObserve process not responding. Disabling logging.',
                            error
                        )
                    }
                } else {
                    console.error('Failed to send logs:', error)
                }
            } finally {
                apiCallInProgress = false
                // Call the callback to continue the stream
                callback?.()
            }
        }
    })
}