numaproj / numaflow

Kubernetes-native platform to run massively parallel data/streaming jobs
https://numaflow.numaproj.io
Apache License 2.0
1.01k stars 98 forks source link

when remote writing from prometheus getting error expected to write body size of -24158 but got 41378 #1763

Open gopaljayanthi opened 2 weeks ago

gopaljayanthi commented 2 weeks ago

Describe the bug In prometheus.yml i am using remote write to send metrics to a source vertex of http type. and getting this error ERROR 2024-06-19 21:54:43 {"level":"error","ts":1718814283.906287,"logger":"numaflow.Source-processor","caller":"forward/forward.go:473","msg":"Retrying failed msgs","vertex":"simple-pipeline-in","errors":{"expected to write body size of -24158 but got 41378":1},"stacktrace":"github.com/numaproj/numaflow/pkg/forward.(InterStepDataForward).writeToBuffer\n\t/home/runner/work/numaflow/numaflow/pkg/forward/forward.go:473\ngithub.com/numaproj/numaflow/pkg/forward.(InterStepDataForward).writeToBuffers\n\t/home/runner/work/numaflow/numaflow/pkg/forward/forward.go:428\ngithub.com/numaproj/numaflow/pkg/forward.(InterStepDataForward).forwardAChunk\n\t/home/runner/work/numaflow/numaflow/pkg/forward/forward.go:314\ngithub.com/numaproj/numaflow/pkg/forward.(InterStepDataForward).Start.func1\n\t/home/runner/work/numaflow/numaflow/pkg/forward/forward.go:143"}

To Reproduce Steps to reproduce the behavior:

  1. install numaflow, prometheus

  2. create interstepbufferservices

  3. create pipeline using pipeline yaml

  4. apiVersion: numaflow.numaproj.io/v1alpha1 kind: Pipeline metadata: annotations: kubectl.kubernetes.io/last-applied-configuration: | {"apiVersion":"numaflow.numaproj.io/v1alpha1","kind":"Pipeline","metadata":{"annotations":{},"name":"simple-pipeline","namespace":"default"},"spec":{"edges":[{"from":"in","to":"cat"},{"from":"cat","to":"out"}],"vertices":[{"name":"in","source":{"generator":{"duration":"1s","rpu":5}}},{"name":"cat","udf":{"builtin":{"name":"cat"}}},{"name":"out","sink":{"log":{}}}]}} creationTimestamp: "2024-06-19T15:54:29Z" finalizers:

    • pipeline-controller generation: 2 name: simple-pipeline namespace: default resourceVersion: "131944" uid: e25b42c7-c454-46c7-b74b-2cd98cd6b952 spec: edges:
    • from: in to: cat
    • from: cat to: out lifecycle: deleteGracePeriodSeconds: 30 desiredPhase: Running limits: bufferMaxLength: 30000 bufferUsageLimit: 80 readBatchSize: 500 readTimeout: 1s vertices:
    • name: in source: http: service: true
    • name: cat scale: {} udf: builtin: name: cat
    • name: out scale: {} sink: log: {} watermark: disabled: false maxDelay: 0s status: conditions:
    • lastTransitionTime: "2024-06-19T15:54:29Z" message: Successful reason: Successful status: "True" type: Configured
    • lastTransitionTime: "2024-06-19T15:54:29Z" message: Successful reason: Successful status: "True" type: Deployed lastUpdated: "2024-06-19T15:54:29Z" phase: Running sinkCount: 1 sourceCount: 1 udfCount: 1 vertexCount: 3
  5. in prometheus config prometheus.yml, include following remote write

  6. . remote_write:

    • name: remote-test url: "https://simple-pipeline-in.default.svc.cluster.local:8443/vertices/in" remote_timeout: 1m queue_config: capacity: 10000 min_shards: 10 max_shards: 100 max_samples_per_send: 50 batch_send_deadline: 10s min_backoff: 30ms max_backoff: 100ms tls_config: insecure_skip_verify: true write_relabel_configs:
      • action: keep regex: cpu_*;true source_labels:
        • name
        • nginx

Expected behavior the source accepts the metrics fromprometheus and forwards to the cat and then to out vertices

Screenshots If applicable, add screenshots to help explain your problem.

Environment (please complete the following information):

Additional context Add any other context about the problem here.


Message from the maintainers:

Impacted by this bug? Give it a 👍. We often sort issues this way to know what to prioritize.

For quick help and support, join our slack channel.

KeranYang commented 2 weeks ago

@gopaljayanthi May I ask why you were using the old version (v0.7.2) of numaflow? Perhaps you could try updating to the latest version (v1.2.1) to see if it resolves the issue.

whynowy commented 2 weeks ago

This bug should have been fixed in later versions.

gopaljayanthi commented 1 week ago

i upgraded to v1.2.1, now I am seeing pod failures with not much in the logs

the pipeline i am using is here https://raw.githubusercontent.com/gopaljayanthi/numalogic-prometheus/master/manifests/pipeline/numalogic-prometheus-pipeline.yaml

k -n numalogic-prometheus get po NAME READY STATUS RESTARTS AGE isbsvc-default-js-0 3/3 Running 0 45m isbsvc-default-js-1 3/3 Running 0 45m isbsvc-default-js-2 3/3 Running 0 45m mlflow-sqlite-5bf68fc797-sxc2r 1/1 Running 2 (6h22m ago) 4d9h numalogic-prometheus-pipeline-daemon-79f6c9449-rjp4j 1/1 Running 0 38m numalogic-prometheus-pipeline-decode-0-2oljb 0/2 CrashLoopBackOff 15 (4m6s ago) 38m numalogic-prometheus-pipeline-filter-0-shtha 0/2 CrashLoopBackOff 18 (38s ago) 38m numalogic-prometheus-pipeline-inference-0-tkvfb 0/2 CrashLoopBackOff 17 (16s ago) 38m numalogic-prometheus-pipeline-input-0-inpqs 1/1 Running 0 38m numalogic-prometheus-pipeline-input-output-0-06ex2 1/1 Running 0 38m numalogic-prometheus-pipeline-output-0-tu4iy 1/1 Running 0 38m numalogic-prometheus-pipeline-postprocess-0-jakah 0/2 CrashLoopBackOff 18 (37s ago) 38m numalogic-prometheus-pipeline-preprocess-0-iq3hh 0/2 CrashLoopBackOff 18 (96s ago) 38m numalogic-prometheus-pipeline-prometheus-pusher-0-sffgp 2/2 Running 2 (27m ago) 38m numalogic-prometheus-pipeline-threshold-0-kfxad 0/2 CrashLoopBackOff 18 (98s ago) 38m numalogic-prometheus-pipeline-trainer-0-hqlrf 0/2 CrashLoopBackOff 17 (20s ago) 38m numalogic-prometheus-pipeline-training-output-0-mrcpc 1/1 Running 0 38m numalogic-prometheus-pipeline-window-0-gbsfk 0/2 CrashLoopBackOff 18 (37s ago) 38m numalogic-redis-cluster-0 1/1 Running 4 (6h22m ago) 4d6h numalogic-redis-cluster-1 1/1 Running 4 (6h21m ago) 4d6h numalogic-redis-cluster-2 1/1 Running 6 (6h21m ago) 4d6h numalogic-redis-cluster-3 1/1 Running 4 (2d4h ago) 4d6h numalogic-redis-cluster-4 1/1 Running 5 (6h21m ago) 4d6h numalogic-redis-cluster-5 0/1 Running 5 (27m ago) 4d6h

in the input vertex pod, i am seeing this {"level":"error","ts":"2024-06-21T21:35:42.500605264Z","logger":"numaflow","caller":"publish/publisher.go:278","msg":"put to bucket failed","entityID":"numalogic-prometheus-pipeline-input-0","otStore":"numalogic-prometheus-numalogic-prometheus-pipeline-input_SOURCE_OT","hbStore":"numalogic-prometheus-numalogic-prometheus-pipeline-input_SOURCE_PROCESSORS","bucket":"numalogic-prometheus-numalogic-prometheus-pipeline-input_SOURCE_PROCESSORS","error":"nats: timeout","stacktrace":"github.com/numaproj/numaflow/pkg/watermark/publish.(*publish).publishHeartbeat\n\t/home/runner/work/numaflow/numaflow/pkg/watermark/publish/publisher.go:278"}

in the decode vertex pod 2024/06/21 22:05:45 starting the gRPC server with unix domain socket... /var/run/numaflow/function.sock .... Error: failed to wait until server info is ready: context deadline exceeded 2024/06/21 22:39:08 | ERROR | {"level":"error","ts":"2024-06-21T22:39:08.590708576Z","logger":"numaflow.MapUDF-processor","caller":"nats/nats_client.go:68","msg":"Nats default: disconnected","pipeline":"numalogic-prometheus-pipeline","vertex":"decode","stacktrace":"github.com/numaproj/numaflow/pkg/shared/clients/nats.NewNATSClient.func3\n\t/home/runner/work/numaflow/numaflow/pkg/shared/clients/nats/nats_client.go:68\ngithub.com/nats-io/nats%2ego.(Conn).close.func1\n\t/home/runner/go/pkg/mod/github.com/nats-io/nats.go@v1.33.1/nats.go:5122\ngithub.com/nats-io/nats%2ego.(asyncCallbacksHandler).asyncCBDispatcher\n\t/home/runner/go/pkg/mod/github.com/nats-io/nats.go@v1.33.1/nats.go:2901"} 2024/06/21 22:39:08 | INFO | {"level":"info","ts":"2024-06-21T22:39:08.590858178Z","logger":"numaflow.MapUDF-processor","caller":"nats/nats_client.go:62","msg":"Nats default: connection closed","pipeline":"numalogic-prometheus-pipeline","vertex":"decode"} 2024/06/21 22:39:08 | ERROR | {"level":"error","ts":"2024-06-21T22:39:08.590881565Z","logger":"numaflow.MapUDF-processor","caller":"nats/nats_client.go:68","msg":"Nats default: disconnected","pipeline":"numalogic-prometheus-pipeline","vertex":"decode","stacktrace":"github.com/numaproj/numaflow/pkg/shared/clients/nats.NewNATSClient.func3\n\t/home/runner/work/numaflow/numaflow/pkg/shared/clients/nats/nats_client.go:68\ngithub.com/nats-io/nats%2ego.(Conn).close.func1\n\t/home/runner/go/pkg/mod/github.com/nats-io/nats.go@v1.33.1/nats.go:5122\ngithub.com/nats-io/nats%2ego.(asyncCallbacksHandler).asyncCBDispatcher\n\t/home/runner/go/pkg/mod/github.com/nats-io/nats.go@v1.33.1/nats.go:2901"} 2024/06/21 22:39:08 | INFO | {"level":"info","ts":"2024-06-21T22:39:08.590895017Z","logger":"numaflow.MapUDF-processor","caller":"nats/nats_client.go:62","msg":"Nats default: connection closed","pipeline":"numalogic-prometheus-pipeline","vertex":"decode"} 2024/06/21 22:39:08 | ERROR | {"level":"error","ts":"2024-06-21T22:39:08.590915688Z","logger":"numaflow.MapUDF-processor","caller":"nats/nats_client.go:68","msg":"Nats default: disconnected","pipeline":"numalogic-prometheus-pipeline","vertex":"decode","stacktrace":"github.com/numaproj/numaflow/pkg/shared/clients/nats.NewNATSClient.func3\n\t/home/runner/work/numaflow/numaflow/pkg/shared/clients/nats/nats_client.go:68\ngithub.com/nats-io/nats%2ego.(Conn).close.func1\n\t/home/runner/go/pkg/mod/github.com/nats-io/nats.go@v1.33.1/nats.go:5122\ngithub.com/nats-io/nats%2ego.(asyncCallbacksHandler).asyncCBDispatcher\n\t/home/runner/go/pkg/mod/github.com/nats-io/nats.go@v1.33.1/nats.go:2901"} 2024/06/21 22:39:08 | INFO | {"level":"info","ts":"2024-06-21T22:39:08.590927835Z","logger":"numaflow.MapUDF-processor","caller":"nats/nats_client.go:62","msg":"Nats default: connection closed","pipeline":"numalogic-prometheus-pipeline","vertex":"decode"} 2024/06/21 22:39:08 | INFO | {"level":"info","ts":"2024-06-21T22:39:08.590957969Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:166","msg":"stopping WatchAll","pipeline":"numalogic-prometheus-pipeline","vertex":"decode","kvName":"numalogic-prometheus-numalogic-prometheus-pipeline-input-decode_PROCESSORS","watcher":"numalogic-prometheus-numalogic-prometheus-pipeline-input-decode_PROCESSORS"} 2024/06/21 22:39:08 | ERROR | {"level":"error","ts":"2024-06-21T22:39:08.590985218Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:170","msg":"Failed to stop","pipeline":"numalogic-prometheus-pipeline","vertex":"decode","kvName":"numalogic-prometheus-numalogic-prometheus-pipeline-input-decode_PROCESSORS","watcher":"numalogic-prometheus-numalogic-prometheus-pipeline-input-decode_PROCESSORS","error":"nats: connection closed","stacktrace":"github.com/numaproj/numaflow/pkg/shared/kvs/jetstream.(jetStreamStore).Watch.func1\n\t/home/runner/work/numaflow/numaflow/pkg/shared/kvs/jetstream/kv_store.go:170"} 2024/06/21 22:39:08 | INFO | {"level":"info","ts":"2024-06-21T22:39:08.591035612Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:166","msg":"stopping WatchAll","pipeline":"numalogic-prometheus-pipeline","vertex":"decode","kvName":"numalogic-prometheus-numalogic-prometheus-pipeline-input-decode_OT","watcher":"numalogic-prometheus-numalogic-prometheus-pipeline-input-decode_OT"} 2024/06/21 22:39:08 | ERROR | {"level":"error","ts":"2024-06-21T22:39:08.591091852Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:170","msg":"Failed to stop","pipeline":"numalogic-prometheus-pipeline","vertex":"decode","kvName":"numalogic-prometheus-numalogic-prometheus-pipeline-input-decode_OT","watcher":"numalogic-prometheus-numalogic-prometheus-pipeline-input-decode_OT","error":"nats: connection closed","stacktrace":"github.com/numaproj/numaflow/pkg/shared/kvs/jetstream.(jetStreamStore).Watch.func1\n\t/home/runner/work/numaflow/numaflow/pkg/shared/kvs/jetstream/kv_store.go:170"} Usage: numaflow processor [flags] Flags: -h, --help help for processor --isbsvc-type string ISB Service type, e.g. jetstream --type string Processor type, 'source', 'sink' or 'udf' panic: failed to wait until server info is ready: context deadline exceeded goroutine 1 [running]: github.com/numaproj/numaflow/cmd/commands.Execute(...) /home/runner/work/numaflow/numaflow/cmd/commands/root.go:33 main.main() /home/runner/work/numaflow/numaflow/cmd/main.go:24 +0x3c

Please help.

kohlisid commented 1 week ago

Hello @gopaljayanthi! From the logs I can see the error 2024/06/21 22:05:45 starting the gRPC server with unix domain socket... /var/run/numaflow/function.sock

This is an older version of the UDF which would not be compatible with numaflow v1.2.1.

As I can see that you are using numalogic, then can you upgrade your code to use the latest version of Numalogic which supports the latest Numaflow SDK versions?