PostHog / plugin-server

Service to process and save PostHog events, supporting JS/TS plugins at that
https://posthog.com
8 stars 5 forks source link

createBuffer is bad - Why BQ isn't the problem #487

Closed yakkomajuri closed 3 years ago

yakkomajuri commented 3 years ago

As the Extensibility team disbanded to conquer the world, it was my turn to be assigned https://github.com/PostHog/bigquery-plugin/issues/10.

Having spent a good amount of time pulling my beard hairs out, I believe I now have the answer. Here's what's happening and a mandatory story of how I got there.

Disclaimer: While I'm reasonably sure of quite a bit of this, some of it is still "hunches", so it may not be 100% technically correct

Context

@neilkakkar suggested I pick this up during the last planning session, noting that this was an error that only happens on local/self-hosted but not Cloud.

This was super weird, because it was reported on local Postgres instances, but also Netdata's ClickHouse instance. I kept thinking about all the things that could be different between these environments and testing them out, and went through a lot of the same steps others here have taken. Could it be logs? Could it be job queues?

Then I started looking into the buffer.

BigQuery ain't the problem

When I first read the issue, I immediately thought this wasn't a BigQuery thing. Why? I had experienced the same issues developing the RudderStack plugin, and the odd retries even featured on a video I made for them :D

It seemed that retry errors were only thrown on the first run, but everything would always complete as expected the second time.

What was different here? In both plugins, the first flush would come from the buffer, whereas all subsequent ones would come from jobs. Bingo, buffer.

The Evil Buffer

To find out if the buffer was the problem, I set up a buffer flush and a job to call the same async function that made a request to an endpoint that'd always wait 5s before responding.

The job always executed in around the same amount of time, but the buffer would vary a lot:

job vs buffer

I went through thousands of runs, and the job was always consistent. The buffer would vary from 5s to 29s and then came the timeouts.

So the buffer was the problem. But finding out why was where my beard hairs started to come out.

I started to notice connection reset, socket hang up, and other sorts of errors. I knew the request would take only 5s, so there were 25s to spare. This made no sense, and so my hunch was that this a problem lower down.

Here came a period of hours and hours looking through node and piscina docs, changing config options, inspecting all the possible lines of code one by one, etc. We can skip ahead from here.

Side note: I think we can improve performance by setting keepAlive to true for plugin fetch requests by default. This had a significant performance benefit, and most plugins will make requests to the same hosts over and over anyway.

0x our savior

Finally, I decided to run the plugin server with flamegraph tool https://github.com/davidmarkclements/0x.

Here's what that looked like:

Screenshot 2021-06-25 at 14 26 46

Hmmm, what the hell is atomicsWaitLoop?

I went to the piscina repo and there was the answer:

function atomicsWaitLoop (port : MessagePort, sharedBuffer : Int32Array) {
  if (!useAtomics) return;

  // This function is entered either after receiving the startup message, or
  // when we are done with a task. In those situations, the *only* thing we
  // expect to happen next is a 'message' on `port`.
  // That call would come with the overhead of a C++ → JS boundary crossing,
  // including async tracking. So, instead, if there is no task currently
  // running, we wait for a signal from the parent thread using Atomics.wait(),
  // and read the message from the port instead of generating an event,
  // in order to avoid that overhead.
  // The one catch is that this stops asynchronous operations that are still
  // running from proceeding. Generally, tasks should not spawn asynchronous
  // operations without waiting for them to finish, though.

  ...
}

"Generally, tasks should not spawn asynchronous operations without waiting for them to finish, though."

Hey! That's exactly what we do.

This by itself didn't fully answer the Cloud vs local/self-hosted question though, but with a bit more digging and testing, the answer came through:

This does happen on Cloud (10k+ events on Sentry) but it happens waaay less because there are always tasks to process.

What happens is that when a piscina worker is looking for a task, while it doesn't find it, it keeps blocking the event loop of that thread until a task comes through. If it does, it may have time to run though the other phases of the event loop and handle the voided steps of the buffer flush before the timeout (e.g. the fetch request may have completed, but its callback hasn't been processed yet). But if no tasks come, it will wait, and wait, and wait, and when it finally comes back, asyncGuard does its thing aaaand we're sending a request to be retried (even though it may have finished, as was pointed out with BQ).

Testing this locally confirmed this:

Screenshot 2021-06-25 at 14 26 16

The fewer events you send, the more atomicsWaitLoop will run for. This isn't much of a problem in Cloud because there's always events (and all the other tasks) to process.

Solution

The solution is to listen to the piscina people and not void promises in tasks.

While it will be adding another task each time, we're better off flushing everything via a job.

yakkomajuri commented 3 years ago

jumping on a call now, potentially more context to come

yakkomajuri commented 3 years ago

For more context, here's a walkthrough of what happens using the snippet I tested with:

global.buffer = createBuffer({
    limit: 5 * 1024 * 1024,
    timeoutSeconds: 1,
    // onFlush will get voided so it happens in the background
    onFlush: async (batch) => {

        // this runs immediately
        const timer1 = new Date().getTime() 

        // here, the request is initiated and the worker moves on
        // to go through the other phases of the event loop
        // but this whole block isn't being awaited by the task
        // so the worker goes into `atomicsWaitLoop` mode
        // no tasks come for e.g. 15s so the request may have completed
        // but the callback (everything after the fetch line) hasn't been called 
        // because the event loop is blocked
        await fetch('https://webhook.site/ba1c2e5c-e471-4873-be4c-b19dfd1dda86') 

        // after 15s this callback is finally called
        // and the log says something like 15s
        // but if 15 was 35, an error is thrown because of `asyncGuard`
        // and this never runs
        const timer2 = new Date().getTime()
        console.log('onFlush took', (timer2-timer1)/1000, 'seconds')
    },
})
mariusandra commented 3 years ago

Oh wow, that's some amazing digging you did there! 💯

The findings make a lot of sense as well. Not sure re: the best solution, as we don't really know and can't know if any library we use is using internal buffers, discared awaits, etc.

We're anyway using a patched pisicna (with a broadcastTask that they didn't accept), so perhaps it makes sense to patch that somehow? Or we can just broadcastTask every 100ms to unblock things? (Probably not).

I don't have good answers (yet?), but again, great work digging this out!

yakkomajuri commented 3 years ago

There's a chance we might be able to solve this by circumventing atomicsWaitLoop with the piscina useAtomics config option set to false. Initial simple testing seems promising in this specific regard, although they warn of a perfomance hit.

Screenshot 2021-06-25 at 16 31 03
yakkomajuri commented 3 years ago

No timeout errors at all from the above.

For the buffer at least, we can probably move it away from plugin-contrib and implement it in the plugin server using a jobs-based flushing mechanism. Certainly this doesn't solve the root cause, which leaves us exposed to this problem from third-party libs we use, as you mentioned.

yakkomajuri commented 3 years ago

Here's a before and after from useAtomics = false (later logs being the "after"):

https://user-images.githubusercontent.com/38760734/123476895-ae0f8300-d5d3-11eb-8438-d5a0857cf0cd.mov

Twixes commented 3 years ago

This is super interesting. :clap: I definitely can see other places where we'd like to void promises in tasks safely (looking at dependencies, but also at our own async functions that we'd like to immediately move on from after calling), so I'd love to see a more general solution too.

mariusandra commented 3 years ago

What about creating an internal promise-tracker? Something like:

const trackedPromises = new Set()
const trackPromise = (promise) => {
  if (trackedPromises.has(promise)) {
    return
  }
  trackedPromises.add(promise)
  promise.finally(() => {
    trackedPromises.remove(promise)
  })
}

function createBuffer() {
  return {
    flush: async () => {
      const actuallyFlush = async () => {
        // do whatever
        // 
      }
      const promise = actuallyFlush()
      trackPromise(promise)
      return promise
    }
  }
}

This won't help with random unhandled promises in plugins (unless we implement some babel magic ;)), like someone forgetting to await a cache.set() call, but already adding this to createBuffer would solve most current problems?

mariusandra commented 3 years ago

To clarify, this trackPromise should probably be internal to the plugin server, not inside the contrib package.

Then in teardown, we could also await Promise.all([...trackedPromises]) for an extra clean shutdown.

neilkakkar commented 3 years ago

Super interesting, great work getting to the bottom of this! :D

Another option might be to move things around such that we don't end up in this situation? Where, instead of every thread getting its own jobQueueManager and connections to Postgres/Redis/S3/wherever, we have a common one in the main thread, and jobs enqueue these into the main thread, which then get dispatched as tasks to the worker threads. I think this makes the task the job itself, instead of waiting on a new task to finish this existing job.

This is related to opening a channel of communication between threads & main server, and also: https://github.com/PostHog/posthog/issues/4709

yakkomajuri commented 3 years ago

The promise tracker wouldn't be generalizable enough to all the use cases I feel.

Let's see the results of configuring useAtomics first because that looks "promising" (:D)

If that doesn't work (it solves this problem, but the question is what sort of performance dip will we see) then we go back to the drawing board.

There's probably ways to patch the piscina worker to do some processing in between looking for tasks, which would be ideal as it would work at the foundation level rather than with some higher level monkey patching. Could well be that this turns out to not be a good idea though. 🤷 Just rubber ducking.

mariusandra commented 3 years ago

We can write a babel plugin that captures all discarded promises (at least inside plugin, can't say for included libs) to make the feature more universal. Plus this makes some sense anyway if we want a clean shutdown procedure... and/or just to track leaks inside plugins. Just trying to solve the "you really shouldn't discard promises" root cause here :).

In any case, feel free to try whatever makes sense!

yakkomajuri commented 3 years ago

Makes sense 👍

yakkomajuri commented 3 years ago

Related Sentry issues:

neilkakkar commented 3 years ago

Q: How did you figure out that these are related to the piscina workers "hanging" for the next task?

yakkomajuri commented 3 years ago

The Sentry issues you mean? Well, they aren't certainly because of this, so maybe I should write "potentially related issues". They are very much in line with things I've seen locally though. But good point, it's not certain this is the only thing at play here.

@neilkakkar

yakkomajuri commented 3 years ago

Update: The original commit that added the Atomics operations claims:

Use Atomics.wait()/Atomics.notify() for communicating between the main thread and worker threads. This saves time spent on asynchronous callbacks, and gives about +33 % more ops/sec in the benchmark.

yakkomajuri commented 3 years ago

Potentially worth running our own benchmarks

yakkomajuri commented 3 years ago

Just dumping this here as a note to self for now.

Overall time

  ✓ piscina worker benchmark (322938 ms)
  ✓ piscina worker benchmark (useAtomics = false) (308432 ms)

Benchmarks (useAtomics = true)

    ┌─────────┬────────────────┬───────────┬────────┬───────────┬──────────┬───────────┐
    │ (index) │    testName    │ coreCount │ events │ batchSize │ 1 thread │ 2 threads │
    ├─────────┼────────────────┼───────────┼────────┼───────────┼──────────┼───────────┤
    │    0    │    'simple'    │     2     │  5000  │     1     │  10435   │   10488   │
    │    1    │    'simple'    │     2     │  5000  │    10     │   9154   │   11162   │
    │    2    │    'simple'    │     2     │  5000  │    100    │   9014   │   10401   │
    │    3    │    'for2k'     │     2     │  5000  │     1     │   1555   │   1972    │
    │    4    │    'for2k'     │     2     │  5000  │    10     │   1587   │   2606    │
    │    5    │    'for2k'     │     2     │  5000  │    100    │   1536   │   1966    │
    │    6    │ 'timeout100ms' │     2     │  5000  │     1     │   1020   │   1841    │
    │    7    │ 'timeout100ms' │     2     │  5000  │    10     │   1130   │   1880    │
    │    8    │ 'timeout100ms' │     2     │  5000  │    100    │   1119   │   1777    │
    └─────────┴────────────────┴───────────┴────────┴───────────┴──────────┴───────────┘

Benchmarks (useAtomics = false)


    ┌─────────┬────────────────┬───────────┬────────┬───────────┬──────────┬───────────┐
    │ (index) │    testName    │ coreCount │ events │ batchSize │ 1 thread │ 2 threads │
    ├─────────┼────────────────┼───────────┼────────┼───────────┼──────────┼───────────┤
    │    0    │    'simple'    │     2     │  5000  │     1     │   7961   │   7985    │
    │    1    │    'simple'    │     2     │  5000  │    10     │   7883   │   9038    │
    │    2    │    'simple'    │     2     │  5000  │    100    │   5681   │   9172    │
    │    3    │    'for2k'     │     2     │  5000  │     1     │   1476   │   2122    │
    │    4    │    'for2k'     │     2     │  5000  │    10     │   1540   │   2131    │
    │    5    │    'for2k'     │     2     │  5000  │    100    │   1409   │   2265    │
    │    6    │ 'timeout100ms' │     2     │  5000  │     1     │   1128   │   1523    │
    │    7    │ 'timeout100ms' │     2     │  5000  │    10     │   1135   │   1515    │
    │    8    │ 'timeout100ms' │     2     │  5000  │    100    │   1122   │   1550    │
    └─────────┴────────────────┴───────────┴────────┴───────────┴──────────┴───────────┘
`
yakkomajuri commented 3 years ago

Another one.

Overall time

  ✓ piscina worker benchmark (useAtomics = false) (290003 ms)
  ✓ piscina worker benchmark (285901 ms)

Benchmarks (useAtomics = false)

    ┌─────────┬────────────────┬───────────┬────────┬───────────┬──────────┬───────────┐
    │ (index) │    testName    │ coreCount │ events │ batchSize │ 1 thread │ 2 threads │
    ├─────────┼────────────────┼───────────┼────────┼───────────┼──────────┼───────────┤
    │    0    │    'simple'    │     2     │  5000  │     1     │   9783   │   10121   │
    │    1    │    'simple'    │     2     │  5000  │    10     │   8898   │   9523    │
    │    2    │    'simple'    │     2     │  5000  │    100    │   9337   │   9888    │
    │    3    │    'for2k'     │     2     │  5000  │     1     │   1553   │   2231    │
    │    4    │    'for2k'     │     2     │  5000  │    10     │   1611   │   2231    │
    │    5    │    'for2k'     │     2     │  5000  │    100    │   1608   │   2236    │
    │    6    │ 'timeout100ms' │     2     │  5000  │     1     │   1063   │   1941    │
    │    7    │ 'timeout100ms' │     2     │  5000  │    10     │   1149   │   1695    │
    │    8    │ 'timeout100ms' │     2     │  5000  │    100    │   1142   │   1674    │
    └─────────┴────────────────┴───────────┴────────┴───────────┴──────────┴───────────┘

Benchmarks (useAtomics = true)

    ┌─────────┬────────────────┬───────────┬────────┬───────────┬──────────┬───────────┐
    │ (index) │    testName    │ coreCount │ events │ batchSize │ 1 thread │ 2 threads │
    ├─────────┼────────────────┼───────────┼────────┼───────────┼──────────┼───────────┤
    │    0    │    'simple'    │     2     │  5000  │     1     │  11380   │   13170   │
    │    1    │    'simple'    │     2     │  5000  │    10     │  12195   │   12656   │
    │    2    │    'simple'    │     2     │  5000  │    100    │  12520   │   12499   │
    │    3    │    'for2k'     │     2     │  5000  │     1     │   1443   │   2797    │
    │    4    │    'for2k'     │     2     │  5000  │    10     │   1755   │   2867    │
    │    5    │    'for2k'     │     2     │  5000  │    100    │   1591   │   2799    │
    │    6    │ 'timeout100ms' │     2     │  5000  │     1     │   1139   │   1512    │
    │    7    │ 'timeout100ms' │     2     │  5000  │    10     │   1142   │   1948    │
    │    8    │ 'timeout100ms' │     2     │  5000  │    100    │   1140   │   1927    │
    └─────────┴────────────────┴───────────┴────────┴───────────┴──────────┴───────────┘
yakkomajuri commented 3 years ago

Ok so some findings on useAtomics = false:

mariusandra commented 3 years ago

That 27% throughput drop for avoidable software reasons doesn't seem that nice :(. What's next?

yakkomajuri commented 3 years ago

Yup, definitely not the approach to take.

What's next is exploring how to make sure we keep track of those promises, preferably done at a lower level than requiring instrumentation on every unhandled promise we create.

Have a few ideas to test.

yakkomajuri commented 3 years ago

However, I might put through a jobs-based buffer implementation, just so we at least get rid of the "BigQuery errors" (that happen in multiple plugins)

yakkomajuri commented 3 years ago

So this is a reasonably hard problem (as anticipated).

There is a way to check for async stuff running in the background using some undocumented Node process methods (process.__getActiveHandles() and process.__getActiveRequests). With enough duct tape, this could be leveraged to await background stuff to finish before completing the task, so it would be guaranteed that once we're in atomicsWaitLoop, there's no background stuff left. The big problem here is that then there's no point in the background processing, as a task would have to wait for them.

Another initial thought was to have atomicsWaitLoop stop once in a while to allow other event loop phases to do some processing. This would take away from the whole performance benefit of using atomicsWaitLoop though, as it is a way to be notified more quickly that there's a task to be picked up.

So with enough searching, I found out that there's a new method for Atomics that would potentially be the solution to all our problems: Atomics.waitAsync. The problem is that this beauty only made it in V8 8.7, which was in turn added in Node 16.0.0.

So if we can't go up to Node 16, it's back to the drawing board.

mariusandra commented 3 years ago

I'd still suggest this: https://github.com/PostHog/plugin-server/issues/487#issuecomment-869457587

Make sure everything we export (redis.set, etc) is wrapped with something that noopawaits the promise and we're done.

This will add another thing to think of when we finally launch any plugins on cloud, but that's worth it now. Node 16 will be mainatream then 😆

yakkomajuri commented 3 years ago

What about something like this?

https://github.com/PostHog/piscina/pull/3/files

RE the promise tracker, I'm confused as to how it would ensure promises are fulfilled when the event loop is blocked? What ensures the event loop can e.g. close sockets in time?

In the buffer, flush can already be awaited, so that's all good, it being voided from add is the issue. Or what am I missing here?

mariusandra commented 3 years ago

I basically mean that here replace currentTasks === 0 with currentTasks === 0 && pendingPromises === 0.

The pendingPromises count could be tracked with an API like this:

import { trackPromise } from '@posthog/piscina'
redis.set = trackPromise(redis.set)
yakkomajuri commented 3 years ago

Hmm but that's the same problem as this:

There is a way to check for async stuff running in the background using some undocumented Node process methods (process.getActiveHandles() and process.getActiveRequests). With enough duct tape, this could be leveraged to await background stuff to finish before completing the task, so it would be guaranteed that once we're in atomicsWaitLoop, there's no background stuff left. The big problem here is that then there's no point in the background processing, as a task would have to wait for them.

mariusandra commented 3 years ago

I don't think these methods will give good results because with the random number of other promises running from within piscina and other parts of the stack. I'd bet you can't reliably determine what's "zero" with them.

Replied in the PR, but AFAIK the main point of atomics is to immediately react when a new message comes in. That makes sense when we have 100 messages per second, but it's a pretty "meh whatever" feature when we get just 2 messages per second. So just decreasing the default atomics wait time (and then running benchmarks) before re-running the entire wait loop including the onMessage part, could solve the problem.

As in, if we decrease it to ~100ms from the current Infinity, once we pass 10 events/sec, we get back into 🚀 territory.

Twixes commented 3 years ago

Should be solved with #490.