getsentry / sentry

Developer-first error tracking and performance monitoring
https://sentry.io
Other
38.63k stars 4.14k forks source link

Crons: Guarantee lock-step clock across topic partitions #55821

Closed evanpurkhiser closed 11 months ago

evanpurkhiser commented 1 year ago

Problem Overview

Right now the Crons monitor check-in ingestion system produces monitor check-in messages into a Kafka topic with 128 partitions. We may have multiple consumers that reads from those partitions. https://github.com/getsentry/sentry/issues/53661 introduces a clock that is driven by these message, it's problematic if this clock ticks before all messages have been consumed for that timestamp.

See the following illustration for a demonstration why

image

This illustration demonstrates the scenario where check-ins are read out of order due to partitioning. Here's what's happening

[!NOTE] This is how our kafka consumers behave there are NO order guarantees, which is a fundamentally bad assumption we made while implementing #53661

  1. We have 4 check-ins across the 3 partitions. The most recently read check-in was at timestamp 01:00 as indicated by the current clock.

  2. We read a check-in from partition 1, the clock is now moved forward to 01:01, this is okay, there are no messages still in the queue at timestamp 01:00, so any check-in that were expected at 01:00 can now be correctly marked as missed, they were never sent.

  3. We read another check-in, this time from partition 3. This message has a timestamp of 01:02 so our clock has now progressed forward, the clock is now at 01:02. We will now check for check-ins that did not come in at 01:01.

    [!WARNING] The system is now in a problematic state! Even though our clock has moved forward we have not read all of the check-ins currently in the Kafka queue. There is still a 01:01 check-in in the second partition. In this situation that check-in will be incorrectly marked as missed.

This problem can become exasperated even more if we have multiple consumers, since it is more likely that some partitions will be read farther ahead of others, thus progressing the clock before all check-ins are read.

During a backlog this can become especially bad if one partition is read ahead of another as well, since the clock may move very far forward before all of the check-ins have been processed.

## Proposed Solution 1 (task partition grouping) (⚠️ vetoed) The primary issue here is that our partitions are not synchronized, but this is not a strict requirement. Instead we can have **one clock per partition** and guarantee that check-ins per monitor are not spread across multiple partitions. When each clock ticks forward, we can dispatch the check-monitor tasks (the tasks that find missed check-ins and timed out monitors) and restrict the task run to be **only for the monitors within that partition**. What this might look like using our original example could be as follows ![image](https://github.com/getsentry/sentry/assets/1421724/0d2e353b-a91a-413f-81d5-561f0f222965) In this case, we have partitioned the check-ins by Monitor ID and maintain a clock for each partition. Importantly **this guarantees processing order** for all monitors. We will NEVER step a clock forward if we haven't already processed all messages for that monitor partition ### Tradeoffs Previously we had partitioned our check-ins across our 128 topic partitions **randomly**. This gave a nice even distribution of messages. 1. If we choose this partitioning schema by partitioning by `monitor_id` or `project_id` we have now gimped those distribution guarantees. If a single monitor decides to bombard us with check-ins it could become problematic for that individual partition. That said, in the future when we have moved rate limiting to relay (https://github.com/getsentry/relay/issues/2490), we should have a better guarantee that a partition is not saturated. 2. Repartitioning the topic becomes difficult. If we add a new partition current in-progress tasks may process the incorrect set of monitors depending on a race between the tasks being created and the number of partitions changing.

Proposed Solution 2 (partition clock synchronization)

The primary issue here is that our partitions are not synchronized, we can fix this by introducing logic that requires all partitions to have been read up to a timestamp before ticking the clock forward.

In this world each partition has it's own clock as well as a global clock. The global clock only moves forward once the minimum value across all the partitions is larger than that of the global clock.

image

  1. We have 4 check-ins across the 3 partitions. The global clock last ticked to 01:00.

  2. The consumer takes a message out of partition 3. The clock for partition 3 has now moved forward, but the global clock has not, since the minimum clock across all partitions is still 01:00.

  3. The consumer takes another message from partition 3. The clock for partition 3 moves forward, but again the global clock does not move forward.

  4. We take from partition 2, it's clock moves forward. Global clock does not move forward.

  5. Finally we take from partition 1, it's clock moves forward to 01:02. The minimum clock across all partitions is now 01:01. The global clock now moves forward since the minimum is different.

This solution keeps the global clock synchronized across all partitions, and we can gaurentee that we've read all messages up to that time before the clock ticks.

Dealing with partition clock jumps

Imagine in the scenario above that in step 4, the message was at 01:02 instead of 01:01. When we processed that message the minimum clock would jump from 01:00 to 01:02.

We will need to be careful that when the global clock synchronization happens, we produce ticks for any number of minutes that may have been skipped.

Dealing with partition drought

This works well when we have a large volume and can guarantee that all partitions are not going to lag behind due to having no messages to consume. Remember the global clock is synchronized to the time of the last processed message across all queues.

This is similar to the issue we originally solved in https://github.com/getsentry/sentry/issues/53661 where we needed to ensure that even when no check-in messages were being produced, we still moved the clock forward.

Again, it's important to remember that the clock drives tasks that mark monitors that haven't produced check-ins as missed. If we completely rely on the presence of messages to move the clock forward we simply won't tick if there is only one monitor that is not sending messages.

There are probably multiple solutions here

Proposed solutions to partition drought

  1. Update the existing clock_tick celery beat task that we have that produces "fake" messages into our Kafka queue to produce those messages into every partition

Tradeoffs

If any partition becomes backlogged the entire world stops for marking missed / timeouts. This is indeed what we want to happen since we want to make sure all check-ins have been processed before the clock ticks. This means things will be slower though until all partitions catch up. Versus if each partition has it's own clock then we can move faster for each partition

Implementation

### Deploy 1
- [ ] https://github.com/getsentry/sentry/pull/57860
- [ ] https://github.com/getsentry/sentry/pull/57943
- [ ] https://github.com/getsentry/relay/pull/2496
### Deploy 2
- [ ] https://github.com/getsentry/sentry/pull/58003
### Tasks
- [ ] https://github.com/getsentry/sentry/pull/58026
davidhollin commented 1 year ago

I would have preferred 5 checkins across 6 partitions, but w/e

evanpurkhiser commented 1 year ago

Get out of here @davidhollin

getsantry[bot] commented 1 year ago

Routing to @getsentry/product-owners-crons for triage ⏲️

evanpurkhiser commented 1 year ago

@lynnagara @untitaker Could ya'll take a look at this issue description and let me know if the solutions make sense.

It sounds like we want to go with solution #2, but I want to make sure dealing with the partition drought problem makes sense.

fpacifici commented 12 months ago

In this case, we have partitioned the check-ins by Monitor ID and maintain a clock for each partition. Importantly this guarantees processing order for all monitors. We will NEVER step a clock forward if we haven't already processed all messages for that monitor partition

It is not strictly true that your clock will always go forward. You can have "relativistic effects" during rebalancing and observe time going backwards. This is what could happen:

This can be mitigated in your application logic by ensuring it is idempotent: if you design your job so that executing it twice does not cause any issue you will be fine. Designing for idempotency when dealing with Kafka is generally a good idea as it is easy to achieve "at least once" delivery. It is harder to achieve "at most once" as you would have to commit all messages (bad) , it is fundamentally impossible to achieve "exactly once" without keeping a state on the consumer side that drops messages already processed.

fpacifici commented 12 months ago

Question about option 2: where are you going to store the global clock? That is shared state across consumers.

Is it supposed to be held in memory by the consumer? This would require always have a single consumer. Which we should really avoid as it puts a hard cap on the scalability of your system (which will be pretty low)

Do you plan to store it externally in a strongly consistent data store? You will have to watch out for the number of writes you will issue in this case. It may or may not be a problem depending on where you store the state.

Others?

It seems to me that, no matter on the implementation, option 2 would introduce an additional external component, with additional failure modes to be kept in mind. Option 1 is simpler and, unless you expect to have a tiny amount of monitors with a huge volume of checkins (more than 10 per second) load should end up being distributed well enough. This scenario of semantic partitioning will be much less damaging than events semantic partitioning, as you do not have to really process all the checkins for a monitor, you just need to do something when you see the timestamp change. Moreover processing the other messages (when timestamp does not change) does not even require parsing the event as the broker timestamp is not part of the payload of the message.

There are other options to implement solution 2 but they are all quite complicated and they all introduce more moving parts.

evanpurkhiser commented 11 months ago

Is it supposed to be held in memory by the consumer

It would be held in redis, just as the current clock is held.

Option 1 is simpler

Does the complexity of adding / removing partitions in this scenario worry you at all? Because of that I was biasing towards option 2.

But I do agree, not having to keep the clock in redis could be valuable now

fpacifici commented 11 months ago

Does the complexity of adding / removing partitions in this scenario worry you at all?

No, it is exceptionally rare. We never run more than 128 partitions, we cannot decrease the number and we always set them as powers of two. You can only increase partitions a given number of times in total.

volokluev commented 11 months ago

Nice explanation, I like the diagrams as well.

Partitioning by monitor id makes sense to me to keep checkins from a specific monitor from going out of sync. Few questions:

  1. How often do you actually rate limit a monitor? What is the customer impact when a monitor is rate limited?
  2. Do you have any numerical estimates for how many monitors and checkins you expect?
  3. If you do option 1, would you still need to keep the clock in redis? I don't think you would
evanpurkhiser commented 11 months ago

Okay. I've had a longer discussion with @fpacifici and @wedamija about this.

We've decided to go with approach 2, while also partitioning monitors using the monitor slug as the partition key (ensuring better ordering).

Here's our reasoning why:

  1. Simplicity. With approach 1 we would almost no matter what need to have logic that maps a monitor to some partition or to some logical partition. We would need to keep that logic duplicated and consistent in both Relay (the producer) and Sentry (the consumer). We would need to translate a logical group back to monitors (not difficult, but extra complexity). In some recovery cases we may need to map a physical kafka partition back to a logical partition, this becomes difficult because the partitioning in relay is completely opaque.

    Implementation also becomes simpler, since we will still have just one task and the logic is clear that we are just synchronizing clocks.

  2. We are satisfied with having 128 partitions. We will NEVER increase the number of partitions here so we do not need to be concerned with re-balancing or partitions changing. This is because 128 is already a very high number of partitions. We will never have this many consumers.

  3. We are okay allowing our kafka client have knowledge to produce messages into specific partition idx's

    From @fpacifici

    I am ok, if celery used the admin client to find the number of partitions and sent the heartbeat on everyone by partition id. The key is that partition number is not in config and not shared between components

  4. We producer has no knowledge of the partition number. This is good. The consumer will only care about the partition number in case of tracking a clock for each partition.

evanpurkhiser commented 11 months ago

This is deployed and operating as expected.

In a backlog situation we should now see no issues with missed-check ins being produced when the backlog