numaproj / numaflow

Kubernetes-native platform to run massively parallel data/streaming jobs
https://numaflow.numaproj.io/
Apache License 2.0
1.13k stars 113 forks source link

Unable to use Redis ISBSVC #1513

Closed QuentinFAIDIDE closed 7 months ago

QuentinFAIDIDE commented 8 months ago

Describe the bug With the simple-example.yaml pipeline, vertice pods are entering CrashLoopBackoff with the following error when the ISBSVC is a Redis one:

➜  ~ kubectl logs -n poseidon-dev simple-pipeline-in-0-rjvcu 
Defaulted container "numa" out of: numa, init (init)
{"level":"info","ts":"2024-02-19T09:12:01.959378402Z","logger":"numaflow.Source-processor","caller":"commands/processor.go:48","msg":"Starting vertex data processor","version":"Version: v1.1.4, BuildDate: 2024-01-20T22:16:46Z, GitCommit: 7ffb521bcc15612d04fe66de33d199e8c8391a7a, GitTag: v1.1.4, GitTreeState: clean, GoVersion: go1.20.12, Compiler: gc, Platform: linux/amd64"}
Error: failed to create a new NATS client pool: environment variable "NUMAFLOW_ISBSVC_JETSTREAM_URL" not found
Usage:
  numaflow processor [flags]

Flags:
  -h, --help                 help for processor
      --isbsvc-type string   ISB Service type, e.g. jetstream
      --type string          Processor type, 'source', 'sink' or 'udf'

panic: failed to create a new NATS client pool: environment variable "NUMAFLOW_ISBSVC_JETSTREAM_URL" not found

goroutine 1 [running]:
github.com/numaproj/numaflow/cmd/commands.Execute()
    /home/runner/work/numaflow/numaflow/cmd/commands/root.go:33 +0x45
main.main()
    /home/runner/work/numaflow/numaflow/cmd/main.go:24 +0x17

Controller logs (note that I was also deleting the pipeline from the default namespace because I forgot to specify the namespace on first yaml applying). Also, the IP 10.43.213.145 it can't reach is the one of the pipeline daemon:

{"level":"info","ts":"2024-02-19T09:09:06.274758783Z","logger":"numaflow.controller-manager","caller":"pipeline/controller.go:258","msg":"Created vertex successfully","namespace":"poseidon-dev","pipeline":"simple-pipeline","vertex":"simple-pipeline-in"}
{"level":"info","ts":"2024-02-19T09:09:06.277439784Z","logger":"numaflow.controller-manager","caller":"vertex/controller.go:202","msg":"Succeeded to create a service","namespace":"poseidon-dev","vertex":"simple-pipeline-in","pipeline":"simple-pipeline","service":"simple-pipeline-in-headless"}
{"level":"info","ts":"2024-02-19T09:09:06.278521597Z","logger":"numaflow.controller-manager","caller":"pipeline/controller.go:258","msg":"Created vertex successfully","namespace":"poseidon-dev","pipeline":"simple-pipeline","vertex":"simple-pipeline-cat"}
{"level":"info","ts":"2024-02-19T09:09:06.281616283Z","logger":"numaflow.controller-manager","caller":"vertex/controller.go:308","msg":"Succeeded to create a pod","namespace":"poseidon-dev","vertex":"simple-pipeline-in","pipeline":"simple-pipeline","pod":"simple-pipeline-in-0-rjvcu"}
{"level":"info","ts":"2024-02-19T09:09:06.281675077Z","logger":"numaflow.controller-manager","caller":"vertex/controller.go:321","msg":"Replicas changed","namespace":"poseidon-dev","vertex":"simple-pipeline-in","pipeline":"simple-pipeline","currentReplicas":0,"desiredReplicas":1}
{"level":"info","ts":"2024-02-19T09:09:06.282023655Z","logger":"numaflow.controller-manager","caller":"pipeline/controller.go:258","msg":"Created vertex successfully","namespace":"poseidon-dev","pipeline":"simple-pipeline","vertex":"simple-pipeline-out"}
{"level":"info","ts":"2024-02-19T09:09:06.285002546Z","logger":"numaflow.controller-manager","caller":"pipeline/controller.go:302","msg":"Created a job successfully for ISB Svc creating","namespace":"poseidon-dev","pipeline":"simple-pipeline","buffers":["poseidon-dev-simple-pipeline-cat-0","poseidon-dev-simple-pipeline-out-0"],"buckets":["poseidon-dev-simple-pipeline-out_SINK","poseidon-dev-simple-pipeline-in-cat","poseidon-dev-simple-pipeline-cat-out","poseidon-dev-simple-pipeline-in_SOURCE"]}
{"level":"info","ts":"2024-02-19T09:09:06.287896311Z","logger":"numaflow.controller-manager","caller":"vertex/controller.go:202","msg":"Succeeded to create a service","namespace":"poseidon-dev","vertex":"simple-pipeline-cat","pipeline":"simple-pipeline","service":"simple-pipeline-cat-headless"}
{"level":"info","ts":"2024-02-19T09:09:06.288450618Z","logger":"numaflow.controller-manager","caller":"pipeline/controller.go:368","msg":"Succeeded to create a daemon service","namespace":"poseidon-dev","pipeline":"simple-pipeline","service":"simple-pipeline-daemon-svc"}
{"level":"info","ts":"2024-02-19T09:09:06.291215162Z","logger":"numaflow.controller-manager","caller":"vertex/controller.go:308","msg":"Succeeded to create a pod","namespace":"poseidon-dev","vertex":"simple-pipeline-cat","pipeline":"simple-pipeline","pod":"simple-pipeline-cat-0-duttw"}
{"level":"info","ts":"2024-02-19T09:09:06.291233257Z","logger":"numaflow.controller-manager","caller":"pipeline/controller.go:420","msg":"Succeeded to create/recreate a daemon deployment","namespace":"poseidon-dev","pipeline":"simple-pipeline","deployment":"simple-pipeline-daemon"}
{"level":"info","ts":"2024-02-19T09:09:06.291252395Z","logger":"numaflow.controller-manager","caller":"vertex/controller.go:321","msg":"Replicas changed","namespace":"poseidon-dev","vertex":"simple-pipeline-cat","pipeline":"simple-pipeline","currentReplicas":0,"desiredReplicas":1}
{"level":"info","ts":"2024-02-19T09:09:06.297616015Z","logger":"numaflow.controller-manager","caller":"vertex/controller.go:202","msg":"Succeeded to create a service","namespace":"poseidon-dev","vertex":"simple-pipeline-out","pipeline":"simple-pipeline","service":"simple-pipeline-out-headless"}
{"level":"info","ts":"2024-02-19T09:09:06.300300533Z","logger":"numaflow.controller-manager","caller":"vertex/controller.go:308","msg":"Succeeded to create a pod","namespace":"poseidon-dev","vertex":"simple-pipeline-out","pipeline":"simple-pipeline","pod":"simple-pipeline-out-0-fnwlt"}
{"level":"info","ts":"2024-02-19T09:09:06.300335571Z","logger":"numaflow.controller-manager","caller":"vertex/controller.go:321","msg":"Replicas changed","namespace":"poseidon-dev","vertex":"simple-pipeline-out","pipeline":"simple-pipeline","currentReplicas":0,"desiredReplicas":1}
{"level":"info","ts":"2024-02-19T09:09:10.745678837Z","logger":"numaflow.controller-manager","caller":"pipeline/controller.go:102","msg":"Deleting pipeline","namespace":"default","pipeline":"simple-pipeline"}
{"level":"error","ts":"2024-02-19T09:09:10.747394743Z","logger":"numaflow.controller-manager","caller":"pipeline/controller.go:108","msg":"Failed to check if it's safe to delete pipeline simple-pipeline: rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing: dial tcp: lookup simple-pipeline-daemon-svc.default.svc on 10.43.0.10:53: no such host\"","namespace":"default","pipeline":"simple-pipeline","stacktrace":"github.com/numaproj/numaflow/pkg/reconciler/pipeline.(*pipelineReconciler).reconcile\n\t/home/runner/work/numaflow/numaflow/pkg/reconciler/pipeline/controller.go:108\ngithub.com/numaproj/numaflow/pkg/reconciler/pipeline.(*pipelineReconciler).Reconcile\n\t/home/runner/work/numaflow/numaflow/pkg/reconciler/pipeline/controller.go:81\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Reconcile\n\t/home/runner/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.14.6/pkg/internal/controller/controller.go:122\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler\n\t/home/runner/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.14.6/pkg/internal/controller/controller.go:323\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem\n\t/home/runner/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.14.6/pkg/internal/controller/controller.go:274\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2.2\n\t/home/runner/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.14.6/pkg/internal/controller/controller.go:235"}
{"level":"error","ts":"2024-02-19T09:09:10.747432566Z","logger":"numaflow.controller-manager","caller":"pipeline/controller.go:83","msg":"Reconcile error","namespace":"default","pipeline":"simple-pipeline","error":"rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing: dial tcp: lookup simple-pipeline-daemon-svc.default.svc on 10.43.0.10:53: no such host\"","stacktrace":"github.com/numaproj/numaflow/pkg/reconciler/pipeline.(*pipelineReconciler).Reconcile\n\t/home/runner/work/numaflow/numaflow/pkg/reconciler/pipeline/controller.go:83\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Reconcile\n\t/home/runner/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.14.6/pkg/internal/controller/controller.go:122\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler\n\t/home/runner/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.14.6/pkg/internal/controller/controller.go:323\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem\n\t/home/runner/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.14.6/pkg/internal/controller/controller.go:274\nsigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2.2\n\t/home/runner/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.14.6/pkg/internal/controller/controller.go:235"}
{"level":"info","ts":"2024-02-19T09:10:32.673818221Z","logger":"numaflow.controller-manager","caller":"pipeline/controller.go:102","msg":"Deleting pipeline","namespace":"default","pipeline":"simple-pipeline"}
{"level":"error","ts":"2024-02-19T09:10:57.07908779Z","logger":"numaflow.autoscaler","caller":"scaling/scaling.go:124","msg":"Failed to scale a vertex","vertex":"poseidon-dev/simple-pipeline-cat","error":"error while fetching buffer info, failed to get the buffer information of vertex \"simple-pipeline-cat\", rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing: dial tcp 10.43.213.145:4327: i/o timeout\"","stacktrace":"github.com/numaproj/numaflow/pkg/reconciler/vertex/scaling.(*Scaler).scale\n\t/home/runner/work/numaflow/numaflow/pkg/reconciler/vertex/scaling/scaling.go:124"}
{"level":"error","ts":"2024-02-19T09:10:57.079126104Z","logger":"numaflow.autoscaler","caller":"scaling/scaling.go:124","msg":"Failed to scale a vertex","vertex":"poseidon-dev/simple-pipeline-out","error":"error while fetching buffer info, failed to get the buffer information of vertex \"simple-pipeline-out\", rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing: dial tcp 10.43.213.145:4327: i/o timeout\"","stacktrace":"github.com/numaproj/numaflow/pkg/reconciler/vertex/scaling.(*Scaler).scale\n\t/home/runner/work/numaflow/numaflow/pkg/reconciler/vertex/scaling/scaling.go:124"}
{"level":"error","ts":"2024-02-19T09:10:57.07918074Z","logger":"numaflow.autoscaler","caller":"scaling/scaling.go:124","msg":"Failed to scale a vertex","vertex":"poseidon-dev/simple-pipeline-in","error":"failed to get metrics of vertex key \"poseidon-dev/simple-pipeline-in\", rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing: dial tcp 10.43.213.145:4327: i/o timeout\"","stacktrace":"github.com/numaproj/numaflow/pkg/reconciler/vertex/scaling.(*Scaler).scale\n\t/home/runner/work/numaflow/numaflow/pkg/reconciler/vertex/scaling/scaling.go:124"}
{"level":"error","ts":"2024-02-19T09:11:07.079299352Z","logger":"numaflow.autoscaler","caller":"scaling/scaling.go:124","msg":"Failed to scale a vertex","vertex":"poseidon-dev/simple-pipeline-cat","error":"error while fetching buffer info, failed to get the buffer information of vertex \"simple-pipeline-cat\", rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing: dial tcp 10.43.213.145:4327: i/o timeout\"","stacktrace":"github.com/numaproj/numaflow/pkg/reconciler/vertex/scaling.(*Scaler).scale\n\t/home/runner/work/numaflow/numaflow/pkg/reconciler/vertex/scaling/scaling.go:124"}
{"level":"error","ts":"2024-02-19T09:11:17.080321494Z","logger":"numaflow.autoscaler","caller":"scaling/scaling.go:124","msg":"Failed to scale a vertex","vertex":"poseidon-dev/simple-pipeline-out","error":"error while fetching buffer info, failed to get the buffer information of vertex \"simple-pipeline-out\", rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing: dial tcp 10.43.213.145:4327: i/o timeout\"","stacktrace":"github.com/numaproj/numaflow/pkg/reconciler/vertex/scaling.(*Scaler).scale\n\t/home/runner/work/numaflow/numaflow/pkg/reconciler/vertex/scaling/scaling.go:124"}
{"level":"error","ts":"2024-02-19T09:11:27.0809779Z","logger":"numaflow.autoscaler","caller":"scaling/scaling.go:124","msg":"Failed to scale a vertex","vertex":"poseidon-dev/simple-pipeline-in","error":"failed to get metrics of vertex key \"poseidon-dev/simple-pipeline-in\", rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing: dial tcp 10.43.213.145:4327: i/o timeout\"","stacktrace":"github.com/numaproj/numaflow/pkg/reconciler/vertex/scaling.(*Scaler).scale\n\t/home/runner/work/numaflow/numaflow/pkg/reconciler/vertex/scaling/scaling.go:124"}
{"level":"error","ts":"2024-02-19T09:11:37.081828284Z","logger":"numaflow.autoscaler","caller":"scaling/scaling.go:124","msg":"Failed to scale a vertex","vertex":"poseidon-dev/simple-pipeline-cat","error":"error while fetching buffer info, failed to get the buffer information of vertex \"simple-pipeline-cat\", rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing: dial tcp 10.43.213.145:4327: i/o timeout\"","stacktrace":"github.com/numaproj/numaflow/pkg/reconciler/vertex/scaling.(*Scaler).scale\n\t/home/runner/work/numaflow/numaflow/pkg/reconciler/vertex/scaling/scaling.go:124"}

Pipeline daemon logs:

➜  ~ kubectl logs --all-containers -n poseidon-dev simple-pipeline-daemon-6cb498bfb-zhf6j
{"level":"info","ts":"2024-02-19T09:09:07.800450562Z","logger":"numaflow.isbsvc-validate","caller":"commands/isbsvc_validate.go:80","msg":"Validate buffers, buckets and side inputs store successfully","pipeline":"simple-pipeline"}
{"level":"info","ts":"2024-02-19T09:30:12.22141854Z","logger":"numaflow.daemon-server","caller":"commands/daemon_server.go:48","msg":"Starting daemon server","pipeline":"simple-pipeline","version":"Version: v1.1.4, BuildDate: 2024-01-20T22:16:46Z, GitCommit: 7ffb521bcc15612d04fe66de33d199e8c8391a7a, GitTag: v1.1.4, GitTreeState: clean, GoVersion: go1.20.12, Compiler: gc, Platform: linux/amd64"}
{"level":"error","ts":"2024-02-19T09:30:12.221635402Z","logger":"numaflow.daemon-server","caller":"server/daemon_server.go:74","msg":"Failed to get a NATS client pool.","pipeline":"simple-pipeline","error":"environment variable \"NUMAFLOW_ISBSVC_JETSTREAM_URL\" not found","stacktrace":"github.com/numaproj/numaflow/pkg/daemon/server.(*daemonServer).Run\n\t/home/runner/work/numaflow/numaflow/pkg/daemon/server/daemon_server.go:74\ngithub.com/numaproj/numaflow/cmd/commands.NewDaemonServerCommand.func1\n\t/home/runner/work/numaflow/numaflow/cmd/commands/daemon_server.go:51\ngithub.com/spf13/cobra.(*Command).execute\n\t/home/runner/go/pkg/mod/github.com/spf13/cobra@v1.6.0/command.go:916\ngithub.com/spf13/cobra.(*Command).ExecuteC\n\t/home/runner/go/pkg/mod/github.com/spf13/cobra@v1.6.0/command.go:1040\ngithub.com/spf13/cobra.(*Command).Execute\n\t/home/runner/go/pkg/mod/github.com/spf13/cobra@v1.6.0/command.go:968\ngithub.com/numaproj/numaflow/cmd/commands.Execute\n\t/home/runner/work/numaflow/numaflow/cmd/commands/root.go:32\nmain.main\n\t/home/runner/work/numaflow/numaflow/cmd/main.go:24\nruntime.main\n\t/opt/hostedtoolcache/go/1.20.12/x64/src/runtime/proc.go:250"}
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x8 pc=0x1bee133]

goroutine 1 [running]:
github.com/numaproj/numaflow/pkg/shared/clients/nats.(*ClientPool).CloseAll(0x363?)
    /home/runner/work/numaflow/numaflow/pkg/shared/clients/nats/client_pool.go:69 +0x33
github.com/numaproj/numaflow/pkg/daemon/server.(*daemonServer).Run(0xc000a49c78, {0x294e2e8, 0xc000dc15f0})
    /home/runner/work/numaflow/numaflow/pkg/daemon/server/daemon_server.go:75 +0x270
github.com/numaproj/numaflow/cmd/commands.NewDaemonServerCommand.func1(0xc000d6a600?, {0x2581745?, 0x1?, 0x1?})
    /home/runner/work/numaflow/numaflow/cmd/commands/daemon_server.go:51 +0x28d
github.com/spf13/cobra.(*Command).execute(0xc000d6a600, {0xc00051c8d0, 0x1, 0x1})
    /home/runner/go/pkg/mod/github.com/spf13/cobra@v1.6.0/command.go:916 +0x862
github.com/spf13/cobra.(*Command).ExecuteC(0x3b90ee0)
    /home/runner/go/pkg/mod/github.com/spf13/cobra@v1.6.0/command.go:1040 +0x3bd
github.com/spf13/cobra.(*Command).Execute(...)
    /home/runner/go/pkg/mod/github.com/spf13/cobra@v1.6.0/command.go:968
github.com/numaproj/numaflow/cmd/commands.Execute()
    /home/runner/work/numaflow/numaflow/cmd/commands/root.go:32 +0x25
main.main()
    /home/runner/work/numaflow/numaflow/cmd/main.go:24 +0x17

To Reproduce Steps to reproduce the behavior:

  1. (Not sure if related) Remove previous Jetstream ISBSVC.
  2. Deploy a redis ISBSVC, (for me external), with the following manifest:
    
    ➜  ~ kubectl describe -n poseidon-dev isbsvc/default    
    Name:         default
    Namespace:    poseidon-dev
    Labels:       <none>
    Annotations:  <none>
    API Version:  numaflow.numaproj.io/v1alpha1
    Kind:         InterStepBufferService
    Metadata:
    Creation Timestamp:  2024-02-19T09:06:10Z
    Generation:          2
    Resource Version:    294759397
    UID:                 e9e4fd63-8e93-40c4-8069-a9cbc2791cf0
    Spec:
    Redis:
    External:
      URL:  redis-isbsvc-redis-cluster-0.redis-isbsvc-redis-cluster-headless.poseidon-dev.svc.helios.mycompany.com:6379,redis-isbsvc-redis-cluster-1.redis-isbsvc-redis-cluster-headless.poseidon-dev.svc.helios.mycompany.com:6379,redis-isbsvc-redis-cluster-2.redis-isbsvc-redis-cluster-headless.poseidon-dev.svc.helios.mycompany.com:6379
    Status:
    Conditions:
    Last Transition Time:  2024-02-19T09:08:14Z
    Message:               Successful
    Reason:                Successful
    Status:                True
    Type:                  Configured
    Last Transition Time:  2024-02-19T09:08:14Z
    Message:               Successful
    Reason:                Successful
    Status:                True
    Type:                  Deployed
    Config:
    Redis:
      URL:  redis-isbsvc-redis-cluster-0.redis-isbsvc-redis-cluster-headless.poseidon-dev.svc.helios.mycompany.com:6379,redis-isbsvc-redis-cluster-1.redis-isbsvc-redis-cluster-headless.poseidon-dev.svc.helios.mycompany.com:6379,redis-isbsvc-redis-cluster-2.redis-isbsvc-redis-cluster-headless.poseidon-dev.svc.helios.mycompany.com:6379
    Phase:    Running
    Type:     redis
    Events:     <none>

3. Create a Numaflow pipeline.

➜ ~ kubectl describe -n poseidon-dev pipelines.numaflow.numaproj.io simple-pipeline Name: simple-pipeline Namespace: poseidon-dev Labels: Annotations: API Version: numaflow.numaproj.io/v1alpha1 Kind: Pipeline Metadata: Creation Timestamp: 2024-02-19T09:09:06Z Finalizers: pipeline-controller Generation: 2 Resource Version: 294759750 UID: 878ebd0f-f5ae-4a27-b445-074818f518cf Spec: Edges: From: in To: cat From: cat To: out Lifecycle: Delete Grace Period Seconds: 30 Desired Phase: Running Pause Grace Period Seconds: 30 Limits: Buffer Max Length: 30000 Buffer Usage Limit: 80 Read Batch Size: 500 Read Timeout: 1s Vertices: Name: in Scale: Source: Generator: Duration: 1s Key Count: 5 Msg Size: 8 Rpu: 5 Value: 5 Name: cat Scale: Udf: Builtin: Name: cat Name: out Scale: Sink: Log: Watermark: Disabled: false Max Delay: 0s Status: Conditions: Last Transition Time: 2024-02-19T09:09:06Z Message: Successful Reason: Successful Status: True Type: Configured Last Transition Time: 2024-02-19T09:09:06Z Message: Successful Reason: Successful Status: True Type: Deployed Last Updated: 2024-02-19T09:09:06Z Phase: Running Sink Count: 1 Source Count: 1 Udf Count: 1 Vertex Count: 3 Events: Type Reason Age From Message


Normal CreateVertexSuccess 7m21s pipeline-controller Created vertex simple-pipeline-in successfully Normal CreateVertexSuccess 7m21s pipeline-controller Created vertex simple-pipeline-cat successfully Normal CreateVertexSuccess 7m21s pipeline-controller Created vertex simple-pipeline-out successfully Normal CreateDaemonSvcSuccess 7m21s pipeline-controller Succeeded to create daemon service simple-pipeline-daemon-svc Normal CreateDaemonDeploySuccess 7m21s pipeline-controller Succeeded to create/recreate daemon deployment simple-pipeline-daemon



Note that I also tried to give the isbsvc a specific name different from default (redis), and the issue was still hapenning after configuring my pipeline to use it.

**Expected behavior**
The pipeline use streams from the specific Redis cluster and start properly.

**Environment (please complete the following information):**

- Kubernetes: v1.26.1+k3s1
- Numaflow: v1.1.4

**Additional context**
Jetstream messaging has been working before, it's the first time I've been trying to use the Redis ISBSVC.
I also deleted some `nats-box` container in the pipeline namespace, that I suspect was related to the previous Jetstream ISBSVC. I also tried restarting the containers (like the controller) in `numaflow-system`.

---

>For quick help and support, join our [slack](https://numaproj.slack.com/join/shared_invite/zt-19svuv47m-YKHhsQ~~KK9mBv1E7pNzfg#/shared-invite/email) channel.

<!-- Issue Author: Don't delete this message to encourage other users to support your issue! -->

**Message from the maintainers**:

Impacted by this bug? Give it a 👍. We often sort issues this way to know what to prioritize.
QuentinFAIDIDE commented 8 months ago

For the daemon server.

https://github.com/numaproj/numaflow/blob/7ffb521bcc15612d04fe66de33d199e8c8391a7a/pkg/daemon/server/daemon_server.go#L70

At this location, the jetstream client pool is initialized, but it requires the envar with its url, even though it's not set in my situation where Redis is used. From what I understand, if I don't use jetstream, then the client pool should not get initialized, and there seems to be no risk to move this initialization inside the switch below.

Take my analysis with a grain of salt, I don't know all the context around the developement of Numaflow, just trying to help out as much as I can, maybe there's a fix at another level that makes more sense :)

QuentinFAIDIDE commented 8 months ago

These locations are all affected:

So since it's everywhere, my guess is that either:

Let me know if one of the mentioned options make sense, I'd even be happy to provide the code for the simplest ones I can handle.

whynowy commented 8 months ago

@QuentinFAIDIDE - There are lots of implementation missing in Redis ISBSvc, a JetStream one needs to be used to run the pipeline at this moment.

QuentinFAIDIDE commented 8 months ago

I have a patch that fixes that here. It does not pass the DropOnFull and Watermark tests though, as I believe these are not supported at all with redis ISBSVC (the Watermak defaults to NoOp in the code if ISBSVC is redis).

vigith commented 8 months ago

Thank you, @QuentinFAIDIDE. Do you mind adding an e2e test to that patch?

vigith commented 8 months ago

It is totally okay not to have DropOnFull or Watermark.

vigith commented 8 months ago

@QuentinFAIDIDE since you are working on the fix, shall i assign this issue to you?

QuentinFAIDIDE commented 8 months ago

Would be happy yeah! For the tests, I'm not sure of where exactly the CI tests are defined. I see those tests, and I see that if I set the ISBSVC="redis" envar, they will use redis, but I don't know where to specify that redis tests for Watermark and DropOnFull should not be ran. Can I add an if statement for ISBSVC=redis in the test code and return if it's true or is there a better way ?

QuentinFAIDIDE commented 8 months ago

It seems that there are also tests that pass with Jetstream and not Redis in the test-reduce-one-e2e suite. I'm investigating why. Reduce is maybe not supported with Redis as well ?

vigith commented 8 months ago

that is correct, Reduce is not supported with Redis ISB.

whynowy commented 8 months ago

If we turn on the E2E for redis, then need to add some condition check for each test cases - e.g. something like skip for redis.