numaproj / numaflow

Kubernetes-native platform to run massively parallel data/streaming jobs
https://numaflow.numaproj.io/
Apache License 2.0
1.1k stars 112 forks source link

Messages dropped in Sinks when buffer is full and sink is failing #1551

Open QuentinFAIDIDE opened 7 months ago

QuentinFAIDIDE commented 7 months ago

Describe the bug I have noticed a few times that messages could disappear in very specific conditions. I accidentally discovered how to reproduce the issue in my pipelines. I apologize for the lengthy description, I have not yet managed to reproduce it in a simple pipeline. Here is a detailed description of the issue:

The pipeline is made of the following vertices: image

If I start the pipeline, and configure block-writer on an invalid keyspace that doesn't have the block table, it will set the sink response to failure with the error message from the gocql driver for cassandra: underfined table block (UDSink code below). As mentioned earlier, I set block-witness to a valid keyspace btc_raw_dev_b that has the block table. After I let it run for a while, until the buffer for the failing block-writer gets full, I edit the vertex yaml in Numaflow webui to set it to a valid keyspace named btc_raw_dev. I then wait for the vertex restart, and the now healthy block-writer sink consumes messages that are in the buffer untill there is nothing left in it. At the end of the process, I end up with the btc_raw_dev block table missing the first blocks, while the btc_raw_dev_b one has all of these missing blocks, and show no sign of missing entries: image I have reproduced this error consistently, it never fails to happen in the described context. I found missing entries in the past in my output tables, even without the misconfigurations, so I imagine that errors in the Sink are not properly recovered on db failures. Here is the code of the Sink (I use the go SDK):

// Sink is the sink function that relies on the cqlRawClient's WriteBlock utility
func (l *BlockSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk.Datum) sinksdk.Responses {
    result := sinksdk.ResponsesBuilder()
    for d := range datumStreamCh {

        blockJSON := d.Value()
        var block bitcoinclients.Block
        err := json.Unmarshal(blockJSON, &block)
        if err != nil {
            result = result.Append(sinksdk.ResponseFailure(d.ID(), err.Error()))
            continue
        }

        dbBlock := blockchaindb.Block{
            Height:        block.Height,
            BlockHash:     block.Hash,
            NoTransaction: block.NoTxs,
            Timestamp:     block.Timestamp,
        }

        err = l.CqlRawClient.WriteBlock(dbBlock)
        if err != nil {
            result = result.Append(sinksdk.ResponseFailure(d.ID(), err.Error()))
            continue
        }

        result = result.Append(sinksdk.ResponseOK(d.ID()))
    }
    return result
}

// [... in another file]

// WriteBlock writes the block in the cassandra database.
func (rbc *RawBlockchainClient) WriteBlock(block Block) error {
    return rbc.cqlSession.Query(
        "INSERT INTO block (height, block_hash, no_transactions, "+
            "timestamp) VALUES (?, ?, ?, ?)",
        block.Height,
        block.BlockHash,
        block.NoTransaction,
        block.Timestamp,
    ).Exec()
}

And here is the pipeline manifest that I use to reproduce the bug (it starts with the wrong keyspace name in the envars for tx and block writer, but not for witness):

kind: Pipeline
apiVersion: numaflow.numaproj.io/v1alpha1
metadata:
  name: ingest-error-test-3
  namespace: poseidon-dev
spec:
  vertices:
    - name: block-metadata
      source:
        udsource:
          container:
            image: registry.gitlab.com/mycompany/pipeline/bitcoin-block-metadata-source:v1.61.0
            env:
              - name: REDIS_HOST
                value: redis-block-source-master.poseidon-dev.svc.cluster.com
            resources: {}
      imagePullSecrets:
        - name: registry-credentials
      scale:
        min: 1
        max: 1

    - name: block-tx-fetcher
      udf:
        container:
          image: registry.gitlab.com/mycompany/pipeline/bitcoin-tx-fetch-udf:v1.61.0
          resources: {}
        builtin: null
        groupBy: null
      imagePullSecrets:
        - name: registry-credentials
      limits:
        readBatchSize: 1
        bufferMaxLength: 10000
      scale:
        min: 4
        max: 15

    - name: tx-coinjoin
      udf:
        container:
          image: registry.gitlab.com/mycompany/pipeline/bitcoin-coinjoin-udf:v1.61.0
          resources: {}
        builtin: null
        groupBy: null
      imagePullSecrets:
        - name: registry-credentials
      limits:
        bufferMaxLength: 100000
      scale:
        min: 5
        max: 5
      partitions: 5

    - name: block-writer
      sink:
        udsink:
          container:
            image: registry.gitlab.com/mycompany/pipeline/bitcoin-block-sink:v1.61.0
            env:
              - name: CASSANDRA_HOSTS
                value: scylla-svc.scylla-helios-dev
              - name: RAW_KEYSPACE
                value: invalid_keyspace_missing_block_table
            resources: {}
      imagePullSecrets:
        - name: registry-credentials
      limits:
        readBatchSize: 50
        bufferMaxLength: 100000
      scale:
        min: 1
        max: 1

    - name: block-writer-witness
      sink:
        udsink:
          container:
            image: registry.gitlab.com/mycompany/pipeline/bitcoin-block-sink:v1.61.0
            env:
              - name: CASSANDRA_HOSTS
                value: scylla-svc.scylla-helios-dev
              - name: RAW_KEYSPACE
                value: btc_raw_dev_b
      imagePullSecrets:
        - name: registry-credentials
      limits:
        readBatchSize: 50
        bufferMaxLength: 100000
      scale:
        min: 1
        max: 1

    - name: tx-writer
      sink:
        udsink:
          container:
            image: registry.gitlab.com/mycompany/pipeline/bitcoin-tx-sink:v1.61.0
            env:
              - name: CASSANDRA_HOSTS
                value: scylla-svc.scylla-helios-dev
              - name: RAW_KEYSPACE
                value: invalid_keyspace_missing_block_table
            resources: {}
      imagePullSecrets:
        - name: registry-credentials
      scale:
        min: 5
        max: 5
      partitions: 5

  edges:
    - from: block-metadata
      to: block-tx-fetcher
      conditions: null

    - from: block-tx-fetcher
      to: tx-coinjoin
      conditions:
        tags:
          operator: or
          values:
            - transaction

    - from: block-tx-fetcher
      to: block-writer
      conditions:
        tags:
          operator: or
          values:
            - block

    - from: block-tx-fetcher
      to: block-writer-witness
      conditions:
        tags:
          operator: or
          values:
            - block

    - from: tx-coinjoin
      to: tx-writer
      conditions: null

To Reproduce I don't have a working example that doesn't involve proprietary code of my client, but I believe we can make one in the upcoming days with some trials and errors. I tried and failed to narrow down the error to this much simpler pipeline where I send sequence of number through HTTP and use a sink that has an envar to make it fail just like my previous pipeline sinks misconfigurations, but didn't succeed:

apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
  name: super-odd
  namespace: poseidon-dev
spec:
  vertices:
    - name: in
      source:
        http: {}
      scale:
        min: 1
        max: 1
    - name: shroedinger-cat
      partitions: 4
      sink:
        udsink:
          container:
            image: docker.io/faidideq/numaflow-shroedinger-cat-sink:v0.0.1
            env:
              - name: CAT_STATE
                value: dead  # <- I run it, let it fail a bit, and change this to "alive", which cat the output
      scale:
        min: 1
        max: 1
  edges:
    - from: in
      to: shroedinger-cat

This has led me to believe that it has to do with how full the buffers are, how many partitions there are, or both, but I didn't have time to confirm it yet.

Expected behavior block-writer's output table has no missing blocks, just like block-writer-witness.

Environment (please complete the following information):


Message from the maintainers:

Impacted by this bug? Give it a 👍. We often sort issues this way to know what to prioritize.

For quick help and support, join our slack channel.

QuentinFAIDIDE commented 7 months ago

No more luck reproducing it with partitions, full buffers, and an additional intermediate flatmap:

apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
  name: super-odd-3
  namespace: poseidon-dev
spec:
  vertices:
    - name: in
      source:
        http: {}
      scale:
        min: 1
        max: 1

    - name: msg-variator
      udf:
        container:
          image: faidideq/numaflow-msg-variator-udf:v0.0.1
      limits:
        bufferMaxLength: 100000
      scale:
        min: 1
        max: 1
      partitions: 1

    - name: shroedinger-cat
      partitions: 2
      sink:
        udsink:
          container:
            image: docker.io/faidideq/numaflow-shroedinger-cat-sink:v0.0.1
            env:
              - name: CAT_STATE
                value: dead
      limits:
        readBatchSize: 2
        bufferMaxLength: 5
      scale:
        min: 1
        max: 1

  edges:
    - from: in
      to: msg-variator

    - from: msg-variator  
      to: shroedinger-cat

:thinking: There must be something about my pipeline (high throughput ?) that trigger this bug and that this test pipeline is missing.

QuentinFAIDIDE commented 7 months ago

Brainstorming ideas: A race condition based on retries (and numa container restarts?). This way in this UDF, 2 partitions + 28 retries ain't likely to create a race condition, but my affected pipeline, with 30k at least, for sure would have it. Let me know if you need me to collect any logs, traces, or anything you would need to troubleshoot that issue. I can also share the code of my UDF in private, but as you can see, the only relevant Sink code was shared above.

QuentinFAIDIDE commented 7 months ago

I reproduced it I think. I send sequence of numbers to the HTTP input, then some msg-variator-udf will repeat it 3000 times with a prefix of an integer going from 0 to 2999. So if the input is 1, the output will go from 1-0 to 1-2999.

  1. Create a redis
  2. Create this pipeline, with the redis host in the cat vertice, as well as with the CAT_STATE envar not being "alive"

    apiVersion: numaflow.numaproj.io/v1alpha1
    kind: Pipeline
    metadata:
    name: super-odd-7
    namespace: poseidon-dev
    spec:
    vertices:
    - name: in
      source:
        http: {}
      scale:
        min: 1
        max: 1
    
    - name: msg-variator
      udf:
        container:
          image: faidideq/numaflow-msg-variator-udf:v0.1.0
      limits:
        readBatchSize: 1
        bufferMaxLength: 10000
      scale:
        min: 4
        max: 4
    
    - name: shroedinger-cat
      partitions: 1
      sink:
        udsink:
          container:
            image: docker.io/faidideq/numaflow-shroedinger-cat-sink:v0.2.0
            env:
              - name: CAT_STATE
                value: dead
              - name: REDIS_HOST
                value: "odd-pipe-redis"
      limits:
        readBatchSize: 50
        bufferMaxLength: 100000
      scale:
        min: 1
        max: 1
    
    edges:
    - from: in
      to: msg-variator
    
    - from: msg-variator  
      to: shroedinger-cat
  3. Port forward the input and feed it a sequence of increasing numbers:
    
    #!/bin/bash
    set -e

for i in {1..10000} do curl -kq -X POST -d "${i}" https://localhost:8443/vertices/in done



4. Wait for a while and let it fail to write to the sink
5. Edit the cat vertice and set CAT_STATE to "alive"
6. After waiting for the healthy sink to  write buffer data, you should see that the first values are missing from the redis set if you open a shell to the redis pod and do a redis-cli command to check the set:
![image](https://github.com/numaproj/numaflow/assets/3892217/dbff0e98-4434-44a5-b3ba-8b422cb1afd5)
(if you need to find the set name, you can run `KEYS *` (there's only one per pipeline)
QuentinFAIDIDE commented 7 months ago

Note that I waited a quite long time (>5min maybe?). Here is the UDF code:

package main

import (
    "context"
    "log"
    "strconv"

    "github.com/numaproj/numaflow-go/pkg/mapper"
)

// Map is the main processing function that will map input of the udf to the outputs. It's actually a flatmap.
func Map(ctx context.Context, keys []string, d mapper.Datum) mapper.Messages {

    // all the items we'll forward for each block
    results := mapper.MessagesBuilder()

    in := string(d.Value())

    for i := 0; i < 3000; i++ {
        results = results.Append(mapper.NewMessage([]byte(in + "-" + strconv.Itoa(i))))
    }

    return results
}

func main() {
    err := mapper.NewServer(mapper.MapperFunc(Map)).Start(context.Background())
    if err != nil {
        log.Panic("Failed to start map function server: ", err)
    }
}

And the user defined cat sink:

package main

import (
    "context"
    "log"
    "os"

    "github.com/go-redis/redis"
    sinksdk "github.com/numaproj/numaflow-go/pkg/sinker"
)

type ShroedingerCatSink struct {
    CatState string
    rdb      *redis.Client
}

func NewShroedingerCatSink() ShroedingerCatSink {
    sc := ShroedingerCatSink{
        CatState: os.Getenv("CAT_STATE"),
    }
    if os.Getenv("REDIS_HOST") != "" {
        sc.rdb = redis.NewClient(&redis.Options{
            Addr: os.Getenv("REDIS_HOST") + ":6379",
        })
    }
    return sc
}

func (scs *ShroedingerCatSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk.Datum) sinksdk.Responses {
    result := sinksdk.ResponsesBuilder()
    for d := range datumStreamCh {
        if scs.CatState != "alive" {
            result = result.Append(sinksdk.ResponseFailure(d.ID(), "cat state is not set to \"alive\""))
            continue
        }

        if scs.rdb == nil {
            log.Print(string(d.Value()))
            result = result.Append(sinksdk.ResponseOK(d.ID()))
        } else {
            _, err := scs.rdb.SAdd("cat-"+os.Getenv("NUMAFLOW_PIPELINE_NAME"), string(d.Value())).Result()
            if err != nil {
                result = result.Append(sinksdk.ResponseFailure(d.ID(), err.Error()))
            } else {
                result = result.Append(sinksdk.ResponseOK(d.ID()))
            }
        }

    }
    return result
}

func main() {
    catSink := NewShroedingerCatSink()
    err := sinksdk.NewServer(&catSink).Start(context.Background())
    if err != nil {
        log.Panic("Failed to start sink function server: ", err)
    }
}
yhl25 commented 7 months ago

@QuentinFAIDIDE Thanks for sharing detailed info and steps to reproduce this issue. I've been able to do the same and understand what's causing the data loss.

Here's what's happening: We've set a limit of 100k messages at a time for each JetStream stream. If we exceed that limit, the messages will be discarded because we use the limits policy. To avoid this from happening we check and do not write more messages to JetStream whenever the buffer limit is exceeded. https://github.com/numaproj/numaflow/blob/1844575f4fbabd2205e1260740ff9b1c2fd9bb3d/pkg/isb/stores/jetstream/writer.go#L160 To determine whether our buffer usage is going over the limit, we check ackPending + pending messages against the total buffer size. This check takes place every second in a go-routine. https://github.com/numaproj/numaflow/blob/1844575f4fbabd2205e1260740ff9b1c2fd9bb3d/pkg/isb/stores/jetstream/writer.go#L81

However, in your case, the pipeline has its buffer size set to 100k, equal to the maxMsgs. Consequently, when the sink gets stuck, the pending messages can increase up to 80k and once the sink resumes its normal function, there is a sudden flow of data to the stream. This situation results in a race condition between writing data to the stream and checking the buffer status. Given that incoming messages in one second can exceed 30k, the total number of messages in the stream exceeds the 100k limit, causing data loss.

Screenshot 2024-03-12 at 3 23 42 PM

As an immediate workaround, either increase the maxMsgs to 200k or cut down your buffer usage to 50k. This will allow for more buffer margin to avoid message loss. (I was able to test with 200k maxMsgs configuration and didn't see data loss).

Meanwhile, I'm working towards a longer-term resolution to address this.

QuentinFAIDIDE commented 7 months ago

Awesome! Congratulations on figuring it out! :muscle: What a nasty bug.

My current ISBSVC settings had the specs.jetstream.bufferConfig.maxMsgs set to 30k to be honest, so yeah with the bulk size being that high, I can imagine there is room for issues. I will greatly increase the maxMsgs then. For the race condition, if I get it right, it sounds like multiple replicas are writing to the same buffers and relying on some async ticker size fetch for knowing if there is risk of data loss. It sounds indeed like this will always leave room for race conditions unless there is either distributed locking or the ability for jetstream to refuse data going over its limits.

You probably thought about it, but if it helps you to brainstorm another protocol, and if I'm not completely wrong with my understanding of how it works, I could imagine the following process for writing buffers with a "worst future size" you store yourself and a lock:

Then, for acking, the process could be:

Thank you for the support :pray:

QuentinFAIDIDE commented 7 months ago

@yhl25 Important note btw, it also happened when I was not using the limits policy, but the WorkQueue one. If I get it right, whathever the policy, limits if defined are enforced.

yhl25 commented 7 months ago

Yes, correct. Once the maxMsgs limit has been set and the stream passes the allowed limit (100k in our case), the older messages will be discarded irrespective of the policy. Try with a higher value for maxMsgs and let me know if you still see data loss.

QuentinFAIDIDE commented 7 months ago

I was not able to reproduce it after changing that, thanks! It seems to be a working mitigation. Though, I think I will wait for a change of write protocol in this ISBSVC before pushing high throughput pipelines to production. I don't feel confident enough estimating if the batch size and maxMsgs will overflow on a 1second surge on all the sinks/buffers I might have around.

yhl25 commented 7 months ago

Thank you for raising this issue. We'll prioritize this promptly.

QuentinFAIDIDE commented 7 months ago

You're welcome! No stress, I will wait whatever time it takes :wink: