PostHog / posthog

🦔 PostHog provides open-source product analytics, session recording, feature flagging and A/B testing that you can self-host.
https://posthog.com
Other
21.23k stars 1.26k forks source link

Multi-bucket event ingestion pipeline for PostHog Cloud #11423

Open macobo opened 2 years ago

macobo commented 2 years ago

Is your feature request related to a problem?

Below I outline a solution to a problem we don't yet have so it's documented if it ever becomes necessary

Plugin-server is not trivially scalable on PostHog cloud. This isn't because of ingestion, but due to plugins, as:

As such if we're faced with a traffic spike or an errant plugin(s) slowing down in ingestion, it's hard for us to just scale up number of plugin-servers without running into parallelism limits.

Current topology

The current (rough) ingestion pipeline looks like this on cloud

graph TD
    Events[Events service] --> KI[events_plugin_ingestion topic]
    KI --> P(plugins-ingestion)
    P --> KE[clickhouse_events_json]
    KE --> Async[plugins-async]
    KE --> Clickhouse

    class KI,KE kafka;
    class Clickhouse ch;

    classDef kafka fill:#ddd,stroke:#fff,stroke-width:4px,color:#000;
    classDef ch fill:#ee0,stroke:#fff,stroke-width:4px,color:#000;

With kafka topics indicated in gray and clickhouse (our primary data store) in yellow.

If you want to know more, the following resources in the handbook might help:

Describe the solution you'd like

Divide traffic into "buckets" and make make ingestion multi-topic with manual and automated tooling around this.

New topology

graph TD
    Events[Events service] --> KI1[events_plugin_ingestion_1]
    Events[Events service] --> KI2[events_plugin_ingestion_2]
    Events[Events service] --> KI3[events_plugin_ingestion_3]
    KI1 --> P1("plugins-ingestion (1)")
    KI2 --> P2("plugins-ingestion (2)")
    KI3 --> P3("plugins-ingestion (3)")
    P1 --> KE1[clickhouse_events_json_1]
    P2 --> KE2[clickhouse_events_json_2]
    P3 --> KE3[clickhouse_events_json_3]
    KE1 --> Async1["plugins-async (1)"]
    KE2 --> Async2["plugins-async (2)"]
    KE3 --> Async3["plugins-async (3)"]
    KE1 --> Clickhouse
    KE2 --> Clickhouse
    KE3 --> Clickhouse

    class KI1,KI2,KI3,KE1,KE2,KE3 kafka;
    class Clickhouse ch;

    classDef kafka fill:#ddd,stroke:#fff,stroke-width:4px,color:#000;
    classDef ch fill:#ee0,stroke:#fff,stroke-width:4px,color:#000;

In this scheme, we would have new kafka topics and extra instances of plugin-server (ex: 3 for both types) where we would write messages. On the clickhouse side, there would be appropriate Kafka/MV tables set up for each of these topics.

Topics for each "bucket" would be created by us manually via MSK.

Splitting events by team

Since plugins are spun up by teams, the way to split events by their team_id.

This could be a simple mod of team_id, or if we wanted to split load roughly equally, we could have a:

Notes on plugin-server

Some haphazard notes on implementation, might change:

Chart-level notes

Implementing this on the chart might be tricky. One option: In values.yaml under plugins/pluginsAsync allow passing in an array of "deployments". Each would contain overrides for that particular deployment (e.g. env vars containing what bucket/topic to subscribe to). We would create a deployment-per-override (or just the single one if left blank)

Note that we'll also need to start relying more heavily on autoscaling once this is live - so work needs to be done to to make that effective for plugin-server. It should likely not only rely on CPU.

Metrics

We should update ingestion lag metrics to reflect new topology.

Additional context

cc @yakkomajuri @tiina303 @hazzadous for ingestion context cc @ellie - we chatted yesterday briefly about this, this was my queue to write this down. cc @timgl - we chatted about some of this a while back

Thank you for your feature request – we love each and every one!

macobo commented 2 years ago

@mariusandra brought up an interesting point on slack: This same architecture could be used as a step towards to providing enterprise users with the capability to "write their own plugins" without us vetting them.

hazzadous commented 2 years ago

each plugin-server needs to load and keep in memory all plugins for all teams

Does the memory usage scale with number of teams as well as number of plugins? From what I've seen of plugins they are mostly quite small, do you know what is eating all the memory?

a single slow plugin in ingestion pipeline might slow down other customers?

If a single customer is causing us to lag, perhaps we can have a customer quota (either day or burstable) for which we will push to a separate topic for processing once it has been reached. That was we can aim for zero lag on the main topic, and provide different guarantees for the other.

Both events service and plugin-server would "fetch" the current mapping of teams->topics on boot/1x per day and use that mapping to decide what topic to write to.

What happens at the boundary between the updates to the mapping, we will presumably end up with events for teams switching between topics? Or would that be fixed and if would only be new teams that would have an updated mapping. I guess that wouldn't be an issue as we'd be loading plugins on demand.

Seems we already have a method of distributing data via Kafka partitioning. We can likely just adjust the partitioning and key, and a custom partition assignment algorithm but I'd also be interested to figure if we can't improve the memory scaling characteristics of the plugin server a bit first before adding orchestration complexity.

macobo commented 2 years ago

Does the memory usage scale with number of teams as well as number of plugins? From what I've seen of plugins they are mostly quite small, do you know what is eating all the memory?

This discussion is unrelated to the restarts/etc we're having so not sure. I don't think memory is relevant for this issue specifically - we're focussing on being able to ingest more data with additional constraints (e.g. slow plugins).

The goal of the line you quoted was to say that we need to be loading (setupPlugin, etc) and then keeping track of/calling all plugins everywhere all at once.

If a single customer is causing us to lag, perhaps we can have a customer quota (either day or burstable) for which we will push to a separate topic for processing once it has been reached. That was we can aim for zero lag on the main topic, and provide different guarantees for the other.

This sounds like a separate and interesting proposal but with not enough detail to evaluate - mind speccing this out under here or under a separate issue?

What happens at the boundary between the updates to the mapping, we will presumably end up with events for teams switching between topics? Or would that be fixed and if would only be new teams that would have an updated mapping. I guess that wouldn't be an issue as we'd be loading plugins on demand.

We should not be constantly moving teams between queues as it might introduce ordering-related issues. However for plugins, yes this was why on-demand loading was outlined above.

Seems we already have a method of distributing data via Kafka partitioning. We can likely just adjust the partitioning and key, and a custom partition assignment algorithm but I'd also be interested to figure if we can't improve the memory scaling characteristics of the plugin server a bit first before adding orchestration complexity.

The partitioning logic is not based on teams - but rather the idea is to send same person data to same consumers. Trying to do multi-bucketing on a partition level is not a good idea: as we'd lose tooling around multi-partition consuming from kafkajs and kafka doesn't handle differently sized partitions super well.

Multi-topic side-steps all those problems and is easier to set up and monitor.

Also see the start of the issue - the goal here isn't urgent and it's solving a scaling problem we don't yet have. I'm not suggesting we build this now but wanted to document to share with the team.

yakkomajuri commented 2 years ago

Something like this makes sense and has been floated around in the past. The post about Segment's Centrifuge is probably a valuable resource here.

On a side note, an implementation of this that might make sense (temporarily or long-term) is dividing these queues up based on importance. @mariusandra used to talk about e.g. fastest, fast, and slow queues. This would allow us to prioritize like enterprise > paid > free customers and also set our priorities straight away for when stuff goes bad. We could for instance turn off the free queue if that would help free up the enterprise queue in some way.

Either way, however we split this up, the multi-queue architecture makes sense.