PostHog / posthog-js-lite

Reimplementation of posthog-js to be as light and modular as possible.
https://posthog.com/docs/libraries
MIT License
69 stars 36 forks source link

Events lost when submitting multiple events and using `flushAt: 1, flushInterval: 0` #232

Closed freeatnet closed 5 months ago

freeatnet commented 6 months ago

Bug description

posthog-node running in a Next.js app router app configured with flushAt: 1, flushInterval: 0 as recommended seems to lose events that are issued in batches.

We have a Next.js 14.2.3 App Router app that includes a batch job (invoked via an API request) that pulls a list of PostHog events and possibly distributes rewards points to users based on those events. The general form of the batch job is attached in additional context below.

When run with slightly non-trivial amounts of data (e.g., 50-100 input events), posthog-node seems only to submit some of them, as indicated by the data visible on posthog.com and message list passed to posthog.on('flush', (...args) => { console.log('PostHog flush', ...args); }).

The issue can be worked around by creating a custom instance of posthog-node with no flushAt: 1, flushInterval: 0 parameters.

How to reproduce

This is probably the bare minimum information for a reproduction — please let me know if you need a sample repo.

  1. Create a Next.js 14.2 app router app, configure a posthog instance with recommended parameters, and set up a batch job that captures a number of events while processing input in a loop.
  2. Set up instrumentation to catch capture, flush, and error events emitted by the posthog instance.
  3. Run the batch job.
  4. Verify that no errors occurred and there are no additional console output (e.g., no "queue is full" messages).

Expected: All events that have triggered a capture event also appear in the flush event, and all events are visible in PostHog. Observed: Some captured events are not submitted to PostHog and don't appear in flush event logs.

Related sub-libraries

Additional context

Sample batch job flow

export async function distributeWelcomeBonuses() {
  const { results } = await queryPostHog(
    HOGQL_QUERY,
    { now: new Date().toISOString() },
    RESULTS_SCHEMA,
    {
      host: env.NEXT_PUBLIC_POSTHOG_HOST,
      projectId: env.POSTHOG_PROJECT_ID,
      apiKey: env.POSTHOG_READ_API_KEY,
    },
  );

  for (const row of results) {
    const { userId, triggerEventKey } = row;

    const shouldReward = Math.random() < 0.8; // some logic

    if (shouldReward) {
      // insert a reward transaction into the database
      await createEarnPointsTransaction(dbClient, {
        userId: userId,
        amount: REWARD_AMOUNT,
        reason: POINTS_AWARDED_REASON,
        internalNote: `${triggerEventKey.event} - ${env.NEXT_PUBLIC_POSTHOG_HOST}/project/${env.POSTHOG_PROJECT_ID}/events/${triggerEventKey.uuid}/${triggerEventKey.timestamp}`,
        eventKey: triggerEventKey,
      });

      posthog.capture({
        distinctId: userId.toString(),
        event: POINTS_AWARDED_EVENT,
        properties: {
          reward_reason: POINTS_AWARDED_REASON,
          reward_amount: REWARD_AMOUNT,
          reward_trigger_event: triggerEventKey,
        },
      });
    }
  }

  await posthog.flush();
}

Thank you for your bug report – we love squashing them!

marandaneto commented 6 months ago

@freeatnet can you let us know which version of the PH Node SDK you are using and the code snippet of the posthog instance creation?

freeatnet commented 6 months ago

@marandaneto 4.0.1. Updated the post, sorry about missing that.

freeatnet commented 6 months ago

To add, posthog instantiation looks like this:

export const posthog = new PostHog(env.NEXT_PUBLIC_POSTHOG_KEY, {
  host: env.NEXT_PUBLIC_POSTHOG_HOST, // set to "https://app.posthog.com"
  flushAt: 1,
  flushInterval: 0,
});

Logging setup follows instantiation and is run before any of the events are captured:

posthog.on("capture", (event) => {
  // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
  console.log("Event UUID: capture", event.uuid);
});

posthog.on("error", (...args) => {
  console.error("PostHog error:", args);
});

posthog.on("flush", (messages) => {
  // eslint-disable-next-line @typescript-eslint/no-unsafe-call
  messages.forEach((message: { uuid: string }) => {
    console.log("Event UUID: flush", message.uuid);
  });
  console.info("PostHog flush:", messages);
});
marandaneto commented 6 months ago

@benjackwhite could the v4 refactoring affect this? works locally for me (not using nextjs tho).

@freeatnet if you do posthog.debug(true) after creating the PostHog instance.

Which logs do you see?

Since you are always using the "seems to lose events", and "seems only to submit some of them" wording, can you double-check the logs and be sure events are lost?

Thanks.

freeatnet commented 6 months ago

@marandaneto I used that wording to mean I might be misconfiguring something or missing some implementation detail :)

Here's a log of a local run of the task. None of the events logged as captured past line 1202 are included in the subsequent flush. I also spot-checked a few of these events using the /project/:projectId/events/:uuid/:timestamp URLs on PostHog, none of the ones I checked show up, so it's not just a case of the flush event not being logged.

TriPSs commented 6 months ago

Having the same issue here except we are running in a cloud function, we're not setting the flushAt: 1, flushInterval: 0 but are calling await postHogClient.shutdown() at the end.

It seems the issue started when we upgrade to v4.

marandaneto commented 6 months ago

@freeatnet or @TriPSs can you provide a minimal reproducible example? just something I can run right away. I tested this locally and all events got flushed correctly (tested 100 a few times) and all events came thru. I wonder if its an environment thing or something like that.

shutdown will call flush internally so it should not be an issue @TriPSs

The test I did is as simple as:

for (let i = 100; i < 200; i++) {
    posthog.capture({
      distinctId: 'dis ' + i,
      event: 'test-event ' + i,
    })
  }

await posthog.flush()
await posthog.shutdown()
TriPSs commented 6 months ago

Here is our complete code (at-least everything that is relevant):

import { ORGANIZATION_GROUP_TYPE } from '@<our org>>/constants/analytics'
import { getLazyPrismaClient } from '@<our org>>/database'
import { LoggerV2 } from '@<our org>>/node/gcp/logging'
import { updateUsageInStripe } from '@<our org>>/node/stripe'
import { OrganizationSubscriptionState } from '@prisma/client'
import axios from 'axios'
import { PostHog } from 'posthog-node'

import type { Request, Response } from '@google-cloud/functions-framework'

/**
 * Update's all active organizations it's usages to Stripe
 */
export async function cronSyncUsagesFunction(req: Request, res: Response): Promise<void> {
  try {
    const organizations = await getLazyPrismaClient().organization.findMany({
      where: {
        subscriptionState: OrganizationSubscriptionState.ACTIVE,

        jobs: {
          some: { billed: false }
        }
      }
    })

    const postHogClient = new PostHog(
      'API KEY',
      {
        host: 'https://eu.posthog.com',
        // disabled: process.env.NODE_ENV !== 'production'
      }
    )

    for (const organization of organizations) {
      const jobCount = await updateUsageInStripe(organization)

      if (jobCount > 0) {
        if (!organization.posthogIdentified) {
          // Make sure the group exists with the latest info
          postHogClient.groupIdentify({
            groupType: ORGANIZATION_GROUP_TYPE,
            groupKey: organization.id,
            properties: {
              name: organization.name,
              subscription: organization.subscription,
              date_joined: organization.createdAt
            }
          })

          await getLazyPrismaClient().organization.update({
            where: { id: organization.id },
            data: { posthogIdentified: true }
          })
        }

        postHogClient.capture({
          // https://posthog.com/docs/product-analytics/group-analytics#advanced-server-side-only-capturing-group-events-without-a-user
          distinctId: 'job_usages_synced',
          event: 'job_usages_synced',
          properties: {
            job_count: jobCount,
            subscription: organization.subscription
          },
          groups: {
            [ORGANIZATION_GROUP_TYPE]: organization.id
          }
        })
      }
    }

    try {
      await postHogClient.shutdown()

    } catch (err) {
      LoggerV2.new()
        .withError(err)
        .error('Error inserting into PostHog')
    }

  } catch (err) {
    LoggerV2.new()
      .withError(err)
      .error('Error syncing usage!')

    res.status(500)
      .send('ok')
  }

  res.status(200)
    .send('ok')
}

Some assumptions:

  1. I have the feeling that the times we lose events are the times it only has 1 event to send.
  2. Maybe the shutdown does not await all the events? That when the environment is killed (what happens in serverless) the requests are cancelled.
marandaneto commented 6 months ago

@TriPSs the posthog should be a global env. and be reused among cronSyncUsagesFunction calls, assuming this can be called multiple times within the same request or something, otherwise, it's not an issue. shutdown should await until all the events are flushed, up to 30s or you can pass a shutdownTimeoutMs

TriPSs commented 6 months ago

@marandaneto this function is called once every 30 minutes. We for now downgraded back to v3 and ~the events seem to be coming in correctly again.~ now the same event comes in multiple times 😅

marandaneto commented 6 months ago

@TriPSs I just tested this locally:

const functions = require('@google-cloud/functions-framework');
const { PostHog } = require('posthog-node');

const posthog = new PostHog('...', {
    // works as well if you uncomment the following lines
    // flushAt: 1,
    // flushInterval: 0
  })
posthog.debug(true)

async function sendEvent(id) {
    // works as well if you uncomment the following line, and comment the global posthog declaration
    // const posthog = new PostHog('...')
    // posthog.debug(true)

    posthog.capture({
        distinctId: 'test',
        event: 'test' + id,
    })

    // works as well if you uncomment the following line, and comment the line after
    // await posthog.flush()
    await posthog.shutdown()
}

functions.http('helloWorld', async (req, res) => {
   console.info("PostHog before hello");

  res.send('Hello, World');

  console.info("PostHog before send event");

  await sendEvent(req.executionId);

  console.info("PostHog end");
});

everything works as expected, hit the endpoint many times very fast, and all events came thru correctly. so I can only help you if you can provide a minimal reproducible example, or can you try to run this example in your environment? full example https://github.com/PostHog/posthog-js-lite/pull/233

TriPSs commented 6 months ago

@marandaneto could it maybe be a issue with the PostHog dashboard? As since I downgraded I now actually see the same event (same uuid) multiple times: image

The sample code was I think only ran locally?

marandaneto commented 6 months ago

@TriPSs this is a different issue, for that, please reach out to customer support or open a new issue on the main repo, I am debugging only the SDK side of things, I don't know the dedupe part well which is server side.

TriPSs commented 6 months ago

@marandaneto okay, thought maybe it was related, marked it off-topic.

marandaneto commented 5 months ago

Tried to reproduce this, and even created a small repro https://github.com/PostHog/posthog-js-lite/blob/main/examples/example-gcp-functions/ but nothing. @freeatnet please check this comment if still relevant.

marandaneto commented 5 months ago

Closing the issue as a part of large repository cleanup, due to it being inactive and/or outdated. Please do not hesitate to ping me if it is still relevant, and I will happily reopen and work on it. Cheers!