numaproj / numaflow

Kubernetes-native platform to run massively parallel data/streaming jobs
https://numaflow.numaproj.io/
Apache License 2.0
1.1k stars 112 forks source link

UDF Container Restart not Triggering Numa Container Restart #1608

Closed yhl25 closed 4 months ago

yhl25 commented 6 months ago

related to https://github.com/numaproj/numaflow/issues/1592

kohlisid commented 6 months ago

I wanted some more details on what scenario was this bug raised for?

Additionally till that time, From some local testing I tried the following scenarios and had a couple of observations

1) Reduce UDF When trying with a manually induced panic in the user code, we see the that the numa container restarts as well

    var counter = 0
    for range inputCh {
        counter++
    }
    panic(1)

Numa container then panics with the following error and restarts.

panic: Got an error while invoking ApplyReduce{error 26 0  gRPC client.ReduceFn failed, Retryable: error reading from server: EOF}

goroutine 191 [running]:
go.uber.org/zap/zapcore.CheckWriteAction.OnWrite(0x0?, 0x0?, {0x0?, 0x0?, 0xc00083a9e0?})
        /Users/skohli/go/pkg/mod/go.uber.org/zap@v1.26.0/zapcore/entry.go:196 +0x54
go.uber.org/zap/zapcore.(*CheckedEntry).Write(0xc0006ff110, {0x0, 0x0, 0x0})
        /Users/skohli/go/pkg/mod/go.uber.org/zap@v1.26.0/zapcore/entry.go:262 +0x24e
go.uber.org/zap.(*SugaredLogger).log(0xc0005a1218, 0x4, {0x0?, 0x0?}, {0xc0008c0ee0?, 0x0?, 0x0?}, {0x0, 0x0, 0x0})
        /Users/skohli/go/pkg/mod/go.uber.org/zap@v1.26.0/sugar.go:316 +0xec
go.uber.org/zap.(*SugaredLogger).Panic(...)
        /Users/skohli/go/pkg/mod/go.uber.org/zap@v1.26.0/sugar.go:159
github.com/numaproj/numaflow/pkg/reduce/pnf.(*ProcessAndForward).invokeUDF(0xc0003668c0, {0x2a78860, 0xc000672410}, 0x0?, 0xc0002f1b40, {0x4047617158?, 0xc000430790?})
        /Users/skohli/numa/numaflow/pkg/reduce/pnf/pnf.go:155 +0x39a
created by github.com/numaproj/numaflow/pkg/reduce/pnf.(*ProcessAndForward).AsyncSchedulePnF in goroutine 1
        /Users/skohli/numa/numaflow/pkg/reduce/pnf/pnf.go:131 +0x1f9

The behavior is similar for the python SDK where the numa container restarts because of the error is reduce, but due to the exception handling in the SDK we do not exit out and only propagate the error.

Do we want the Python UDF to restart/exit out in such a scenario as well?

2) Map UDF For both the SDKs in a Map UDF if we induce a panic/exception in the user code. The Numa container treats that as a retry-able error and keeps trying the mapFn again without any restarts.

From a design perspective, is this the condition where we are expecting the numa container to restart in this scenario?

Currently we do not allow the user to propagate any error from their end, as a scenario we could think about that if required.

cc @vigith @yhl25 @whynowy

vigith commented 6 months ago

@kohlisid https://github.com/numaproj/numaflow/issues/1592 issue made us open the other one.

kohlisid commented 6 months ago

After checking the issue these are the points that we need to cover Foremost, The main thing to consider is that in any scenario or error we should not have data loss. Like updated in the previous comment, we can see that the condition is very well handled for map and reduce.

I verified the data loss condition for map today by adding a transient panic in the user code. The UDF container would restart and we were able to still see all the data in the sink after the processing starts again. sink.log

Now considering a source, specifically a user defined source which is used here there are two points

1) On seeing an panic in the UD Source code, the UD source container should restart or not?

This behavior should be consistent for all the SDKs.

Ideally we should be restarting the UD Source container for handling the case where it can help to recover from any transient issues that could have occurred, if the user is not handling them explicitly. This would establish the gRPC connections, and if the user code has recovered we should be able to see the messages getting processed again.

This should be done for both ack and read.

If the issue is persistent, we would anyway get into a crashback loop and the data would not processed. The important points here are to log out the errors/exceptions properly so that the logs can be checked for the exact issues.

As seen in the above issue, we see that we are consuming the error and not restarting this should change. if the user has implemented exception handling in their code, that would be in their own accord.

If we do not restart we leave the onus on the Numa container to make sure that we are able to retry in case the error was transient.

Currently, we do have a mismatch in our SDKs for this behavior, I will create an issue to track and fix that.

2) The need for Numa to restart?

We should not need to restart the Numa container as we keep looping and keep reading the next batch.

In a case if we got some errors while reading, if there are any partial messages that were read out of the whole batch and we were even able to ack them, then the offsets would be updated correctly anyway.

In the other case even if there is an error with ack, As we keep reading in a loop untill explicitly stopped, we would request a reread of these same offsets and rely on Dedup while trying to write them again on the buffer if we had written them in the first iteration as well.

Though, there is an onus on the UD source writer to ensure this read/ack consistency in their code as well. @whynowy Do you think we can keep some kind of rule or enforcement similar to what we were chatting about?

As an enhancement, we could add a retry mechanism on Ack on seeing an error. This could help for the cases where the ack did not go through because of a transient error. And then we would not need to retry the read for those messages again, reducing the dedup as well. We do not have that right now, was there a specific reason for this though?


As a next step if this is the behavior we want to go ahead with, We can confirm on this by creating a UD Source for Kafka, or using Nagas code along with a restart patch.

It would be good to check this for our inbuilt sources as well if we are covering acks/read correctly in such a scenario. And whats the expectation for the source types.

@whynowy @vigith @yhl25 Does this seem consistent with your understanding as well?

vigith commented 6 months ago

@kohlisid we need to make sure UDF exceptions will restart the UDF container. Can you work on those to start with?

kohlisid commented 4 months ago

https://github.com/numaproj/numaflow-python/pull/160 https://github.com/numaproj/numaflow-java/pull/121 in Go SDk we already panic and restart