numaproj / numaflow

Kubernetes-native platform to run massively parallel data/streaming jobs
https://numaflow.numaproj.io/
Apache License 2.0
1.29k stars 115 forks source link

Buffer Usage Calculation and ISB Writing Race Condition: Potential Data Loss in High-Throughput Pipelines #1554

Open yhl25 opened 8 months ago

yhl25 commented 8 months ago

https://github.com/numaproj/numaflow/issues/1551 https://github.com/numaproj/numaflow/issues/1551#issuecomment-1991223266

vigith commented 8 months ago

Move to 1.3 because we have a work around.

QuentinFAIDIDE commented 7 months ago

I was thinking of something. Can't we modify the discard policy in the buffer creation right here ? https://github.com/numaproj/numaflow/blob/f1e5ba0eb222edf3e5a5593769efc3626b092c1b/pkg/isbsvc/jetstream_service.go#L117

We hard-code this DiscardOld policy, which is the reason we encounter data loss instead of a write error according to the Jetstream doc.

The Discard Policy sets how messages are discarded when limits set by LimitsPolicy are reached. The DiscardOld option removes old messages making space for new, while DiscardNew refuses any new messages.
yhl25 commented 7 months ago

DiscardNew only works with the workQueue policy, we cannot use the limits policy with DiscardNew in our use case. But when I was testing DiscardNew with the workQueue policy, I found some messages stuck in the stream, which were never delivered to the consumer. I regret forgetting to update the issue with my findings. Since we had a workaround, I haven't spent too much time on this, but I will look into it when I find some time.

QuentinFAIDIDE commented 7 months ago

Could you take a minute to elaborate on why it can't work ? I got all the test passing in a draft PR and have started torturing Numaflow with this change, and for now no issue yet. Sorry for being persistent, I'm a curious person, I will leave you in peace once I understand :laughing: It feels like it would make your lives easier to not be living under a buffer that silently drops data when it's full, but just error out :)

yhl25 commented 7 months ago

When the stream is configured with the Limits policy and DiscardOld, Jetstream will automatically delete old messages if either the maxMsgs limit of 100k or the maxAge limit of 72h is met. However, when operating with DiscardNew and Limits, reaching the maxMsgs of 100k will result in the stream returning an error when attempting to write new messages and old messages will not be deleted from the stream, so the pipeline will get stuck. On the other hand, using DiscardNew with WorkQueue works fine because Jetstream deletes messages once they are consumed and acked.