getsentry / sentry-javascript

Official Sentry SDKs for JavaScript
https://sentry.io
MIT License
7.84k stars 1.54k forks source link

Insights: auto-Instrument BullMQ for Queue Insights #12956

Open bcoe opened 1 month ago

bcoe commented 1 month ago

Problem Statement

Similar to Celery for Python, BullMQ seems like it would be a good choice to auto-instrument for queue insights, see: https://github.com/getsentry/sentry-javascript/discussions/10148, along with conversations on Twitter:

Screenshot 2024-07-17 at 10 53 00 AM Screenshot 2024-07-17 at 10 53 19 AM Screenshot 2024-07-17 at 10 53 11 AM

Solution Brainstorm

We should model an implementation after the implementation for Celery.

AbhiPrasad commented 1 month ago

There is OTEL instrumentation we can leverage here: https://github.com/appsignal/opentelemetry-instrumentation-bullmq

AbhiPrasad commented 1 month ago

In the time being, we can probably write some docs that uses the otel instrumentation manually with bullmq. So users would have to install opentelemetry-instrumentation-bullmq themselves and then call span.setAttribute themselves.

@bcoe for an intermediate solution what do you think?

iacobus commented 6 days ago

Hi! Our team is also interested in instrumenting our BullMQ workloads running inside of NestJS. I know close to nothing about OTEL or about how Sentry works, but I've been playing with the OTEL instrumentation package that you referenced, and got something, but not much.

What I've done is basically trying to integrate the OTEL instrumentation with @sentry/node by looking at how other integrations work. The following snippet defines a BullMQ integration based off the PG tracing code:

import { BullMQInstrumentation } from '@appsignal/opentelemetry-instrumentation-bullmq'
import { defineIntegration } from '@sentry/core'
import { generateInstrumentOnce } from '@sentry/node'
import type { IntegrationFn } from '@sentry/types'

const INTEGRATION_NAME = 'BullMQ'

export const instrumentBullMQ = generateInstrumentOnce(
  INTEGRATION_NAME,
  () =>
    new BullMQInstrumentation({}),
)

const _bullmqIntegration = (() => {
  return {
    name: INTEGRATION_NAME,
    setupOnce() {
      instrumentBullMQ()
    },
  }
}) satisfies IntegrationFn

/**
 * BullMQ integration
 *
 * Capture tracing data for BullMQ.
 */
export const bullmqIntegration = defineIntegration(_bullmqIntegration)

With this snippet I can add the bullmqIntegration to my integrations array in Sentry.init (which also contains nodeProfilingIntegration()), so that when I trigger a Bull job, I see a transaction of type message under the Performance section of Sentry, with these relevant attributes:

messaging.operation: process,
messaging.system: bullmq,
otel.kind: CONSUMER,
sentry.op: message,
sentry.origin: manual,

However, this transaction is empty. Its duration is just a few milliseconds (shorter than the job duration), and it contains no spans at all, even though there are PG queries inside (PG queries are correctly reported in our regular application tracing). We also see no reference to the producing side of the job, nor we see anything under the Queues section of Sentry.

Not sure if you were able to get started on those docs you mentioned, but if you have some tips that could unblock us, we'd appreciate it 🙏

andreiborza commented 5 days ago

Hello, thanks for writing in. We are on company-wide hackweek and thus on limited support this week. We'll take a look at this next week.

iacobus commented 5 days ago

Oh cool, have fun and thanks!

iacobus commented 5 days ago

Actually, upon further debugging, I came across a very subtle bug in our instrumented code where we were not using async functions correctly. Once I've fixed that, I do see database spans inside of the message span created by the OTEL instrumentation, and I also see spans for the enqueuing phase of the job lifecycle, so I think the integration code I shared above is actually sufficient to get tracing working.

We still don't see anything under the Queues feature, so would still appreciate a note in that regard, but my guess is that the OTEL instrumentation is not precisely compatible with what Sentry expects to show data under Queues?

lforst commented 2 days ago

@iacobus Thanks for the patience! Very cool that you got the integration working yourself. The likely reason you're not getting any data in the queues module yet is that the performance data needs to have a certain schema to show up under that module. Usually, it is the span op and/or span attributes that need to follow a certain convention.

For queues, this convention can be found here: https://develop.sentry.dev/sdk/telemetry/traces/modules/queues/ (these are our developer docs)

Until we provide support for bullmq out of the box you may be able to hack this in. Keep us posted!

iacobus commented 2 days ago

Thanks @lforst. As relevant context for your team, it looks like the instrumentation library under discussion has opted to follow the OpenTelemetry Semantic Convention for Messaging Spans, which seems to be slightly different from what Sentry Queues defines in its spec. OT standard makes intuitive sense to me, curious to hear your thoughts.

Getting Queues to work is not as high priority for us right now, and this library doesn't seem easily customizable, so gonna punt on this for some time. I'll stay tuned to this issue. Thanks for the support.

lforst commented 2 days ago

As far as I can tell we are also adhering to the otel sem conventions except for the span op - which is not an otel concept.

antoinedc commented 1 day ago

Hey @iacobus & @lforst,

I was just looking into this as well this week. I tried forking & working on the appsignal version, but it resulted into events being mixed up in the Sentry UI (some queue events would show up under another queue).

I came up with this snippet of code. It's very hacky, but it's working somehow. It handles add(), addBulk() & processJob(). I'm not using flows so I probably won't implement it. I thought it might be helpful maybe to put it here.

I'm sure there are some things to fix in here. Especially around the duplication of spans attributes. Also, I need to look into it more to be sure, but I'm pretty sure I end up with a lot more recorded processed events than published. So, if anyone has an idea on why that could be, I'd love to know.

const { Queue, Worker } = require('bullmq');
const bullMQIntegration = {
    name: 'bullmq',
    setupOnce() {
        const originalAddBulkQueue = Queue.prototype.addBulk;
        Queue.prototype.addBulk = async function (...args) {
            const jobs = args[0];
            const messageId = crypto.randomBytes(8).toString('hex');

            Sentry.startSpan({
                name: 'queue_producer_transaction',
                op: 'queue.publish',
                attributes: {
                    'sentry.op': 'queue.publish',
                    'messaging.message.id': messageId,
                    'messaging.destination.name': this.name,
                    'messaging.system': 'bullmq'
                }
            },
            async parent => {
                const promises = [];
                for (let i = 0; i < jobs.length; i++) {
                    const job = jobs[i];
                    promises.push(
                        Sentry.startSpan(
                            {
                                name: 'queue_producer',
                                op: 'queue.create',
                                attributes: {
                                    'sentry.op': 'queue.create',
                                    'messaging.message.id': messageId,
                                    'messaging.destination.name': this.name,
                                    'messaging.system': 'bullmq'
                                }
                            },
                            async span => {
                                const traceHeader = Sentry.spanToTraceHeader(span);
                                const baggageHeader = Sentry.spanToBaggageHeader(span);
                                const instrumentationData = { traceHeader, baggageHeader, timestamp: Date.now(), messageId };
                                await redis.lpush(`span:${job.name}`, JSON.stringify(instrumentationData));
                            }
                        )
                    );
                }

                await Promise.all(promises);
                await originalAddBulkQueue.apply(this, args);
            });
        }

        const originalAddQueue = Queue.prototype.add;
        Queue.prototype.add = async function (...args) {
            const messageId = crypto.randomBytes(8).toString('hex');
            Sentry.startSpan({
                name: 'queue_producer_transaction',
                op: 'queue.publish',
                attributes: {
                    'sentry.op': 'queue.publish',
                    'messaging.message.id': messageId,
                    'messaging.destination.name': this.name,
                    'messaging.system': 'bullmq'
                }
            },
            parent => {
                Sentry.startSpan(
                    {
                        name: 'queue_producer',
                        op: 'queue.publish',
                        attributes: {
                            'sentry.op': 'queue.publish',
                            'messaging.message.id': messageId,
                            'messaging.destination.name': this.name,
                            'messaging.system': 'bullmq'
                        }
                    },
                    async span => {
                        const traceHeader = Sentry.spanToTraceHeader(span);
                        const baggageHeader = Sentry.spanToBaggageHeader(span);
                        const instrumentationData = { traceHeader, baggageHeader, timestamp: Date.now(), messageId };
                        await redis.lpush(`span:${args[0]}`, JSON.stringify(instrumentationData));
                        await originalAddQueue.apply(this, args);
                    }
                );
            });
        };

        const originalRunWorker = Worker.prototype.processJob;
        Worker.prototype.processJob = async function(...args) {
            const message = JSON.parse(await redis.lpop(`span:${args[0].name}`));
            if (!message)
                return originalRunWorker.apply(this, args);

            const latency = Date.now() - message.timestamp;
            Sentry.continueTrace(
                { sentryTrace: message.traceHeader, baggage: message.baggageHeader },
                () => {
                    Sentry.startSpan({
                        name: 'queue_consumer_transaction',
                        op: 'queue.process',
                            attributes: {
                                'sentry.op': 'queue.process',
                                'messaging.message.id': message.messageId,
                                'messaging.destination.name': args[0].queue.name,
                                'messaging.message.receive.latency': latency,
                                'messaging.system': 'bullmq'
                            }
                    },
                    parent => {
                        Sentry.startSpan({
                            name: 'queue_consumer',
                            op: 'queue.process',
                            attributes: {
                                'sentry.op': 'queue.process',
                                'messaging.message.id': message.messageId,
                                'messaging.destination.name': args[0].queue.name,
                                'messaging.message.receive.latency': latency,
                                'messaging.system': 'bullmq'
                            }
                        }, (span) => {
                            originalRunWorker.apply(this, args)
                            parent.setStatus({ code: 1, message: 'ok' });
                        });
                    })
                }
            );
        };
    },
};

Sentry.init({
    dsn: getSentryDsn(),
    environment: getNodeEnv() || 'development',
    release: `ethernal@${getVersion()}`,
    integrations: [
        nodeProfilingIntegration(),
        Sentry.postgresIntegration,
        bullMQIntegration,
    ],
    tracesSampleRate: 1.0,
    profilesSampleRate: 1.0,
    debug: true
});

And here is what it looks like in Sentry:

Image

And each destination contains two lines: queue_producer_transaction (type "Producer", with the number of Published events/error rate/time spent set), and queue_consumer_transaction (type "Consumer", with the avg time in queue/processing time/processed/error rate/time spent set)