vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
17.41k stars 1.51k forks source link

2 sinks with buffer.when_full drop_newest/block doesn't work as expected. #20764

Closed nmiculinic closed 1 month ago

nmiculinic commented 2 months ago

A note for the community

Problem

I have vector unified architecture.

In the aggregator layer I have 2 sinks, A and B. Sink A is the production sink, while B is experimental one. Any errors in B should be ignored, and not block data sending to A. However that's not what happened despite my best configuration efforts (sink B has ack disabled, and data-dropping configured on buffer)

Configuration

expire_metrics_secs: 3600
acknowledgements:
  enabled: true
sources:
  vector:
    type: vector
    address: 0.0.0.0:6000
sinks:
  A:
    type: http
    inputs:
      - vector
    uri: <>
    compression: gzip
    encoding:
      codec: json
  B:
    type: vector
    acknowledgements:
      enabled: false
    inputs:
      - vector
    buffer:
      when_full: "drop_newest"
      max_events: 5000
    batch:
      max_bytes: 4000111  # < default gRPC message limit
      max_events: 100
    address: <>
    compression: true
    request:
      concurrency: "adaptive"
      adaptive_concurrency:
        initial_concurrency: 5

There's some transforms in the middle, but they're not really relevant here. Upon sink B failing/being unreachable on the network layer I see increase in discarded events for vector source ( vector_component_discarded_events_total{component_id="vector", component_kind="source"} metric ).

I would expect any failure on sink B would be ignored, and data would flow successfully to sink A. However it seems to completely stop any data sending to sink A. The processes have sufficient CPU/memory resources and aren't being throttled.

Version

0.38.0

Debug Output

No response

Example Data

No response

Additional Context

This is running in k8s, with vector-agent being node level agent, shipping data to envoy LB, shipping to vector-aggregator layer. I also see elevated 504 returned by vector-aggregator on the envoy LB layer envoy_cluster_upstream_rq metric

References

No response

jszwedko commented 2 months ago

Thanks for the report @nmiculinic . That definitely sounds wrong since you have when_full: "drop_newest" and acknowledgements.enabled: false on sink B. It should be shedding load such that A would be unaffected. I wonder if the acknowledgement feature isn't playing nicely with the load shedding of the buffer 🤔

nmiculinic commented 2 months ago

@jszwedko btw I can confirm this is still an issue in 0.39.0 version as well

nmiculinic commented 1 month ago

@jszwedko https://github.com/nmiculinic/vector/blob/6eecda55020214364fda844cf8ed16a9b6cc2a5c/lib/vector-core/src/config/mod.rs#L352

does this mean if acknowledgements.enabled are set to true on global level it will override any per-sink configuration?

I'm not sure am I following code, but in my tests once I disable global ack, and only configure ack on sink A things work as expected per my testing. Please let me know if I misunderstood something

jszwedko commented 1 month ago

@jszwedko https://github.com/nmiculinic/vector/blob/6eecda55020214364fda844cf8ed16a9b6cc2a5c/lib/vector-core/src/config/mod.rs#L352

does this mean if acknowledgements.enabled are set to true on global level it will override any per-sink configuration?

I'm not sure am I following code, but in my tests once I disable global ack, and only configure ack on sink A things work as expected per my testing. Please let me know if I misunderstood something

That does seem suspicious, but note that here the or is combining Option types rather than booleans. Here is where the merge happens: https://github.com/vectordotdev/vector/blob/ef4f1752b40af6a405df718320feff55e042e0b4/src/config/mod.rs#L164-L169

That should use the sink level config, if it is set, otherwise use the global config. It certainly seems like something to verify is working correctly though.

nmiculinic commented 1 month ago

@jszwedko actually this was a red herring.

I've tried explicitly disabling acks everywhere, and things kinda worked? But still I was suspicious something is amiss here... (plus overall less throughput in service for some reason; like 30% less logs passing through despite the enough CPU/memory capacity)

so for the sink B; the failing sink I've added following:

    request:
      concurrency: "adaptive"
      adaptive_concurrency:
        initial_concurrency: 5
      retry_attempts: 3  # default unlimited
      timeout_secs: 2   # default 60s

and this broke the whole thing. So it was acks disabled everywhere, one sink (B) failing, but had buffer with policy:

    buffer:
      when_full: "drop_newest"
      max_events: 5000

and I'd expect its failure not to impact main sink A, or the whole pipeline...but it did.

this is what I see in logs for vector-aggregator pods:

"vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] is being suppressed to avoid flooding.                                                                            │
│ 2024-07-18T14:31:27.484963Z  WARN sink{component_kind="sink" component_id=B component_type=vector}: vector::sinks::util::retries: Request timed out. If this happens often while the events are actually reaching their destination, try decreasing `batch.max_bytes` and/or using `compression` if applicable. Alternatively `request.timeout_secs` can be increased. internal_log │
│ 2024-07-18T14:31:27.485426Z  WARN sink{component_kind="sink" component_id=B component_type=vector}: vector::sinks::util::retries: Internal log [Retrying after error.] has been suppressed 3 times.                                                                                                                                                                                 │
│ 2024-07-18T14:31:27.485441Z  WARN sink{component_kind="sink" component_id=B component_type=vector}: vector::sinks::util::retries: Retrying after error. error=Request failed: status: Unknown, message: "operation was canceled: request has been canceled", details: [], metadata: MetadataMap { headers: {} } internal_log_rate_limit=true                                        │
│ 2024-07-18T14:31:27.485663Z  WARN sink{component_kind="sink" component_id=B component_type=vector}: vector::sinks::util::retries: Internal log [Retrying after error.] is being suppressed to avoid flooding.                                                                                                                                                                       │
│ 2024-07-18T14:31:36.642543Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] has been suppressed 16 times.                                                                                     │
│ 2024-07-18T14:31:36.642569Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Events dropped intentional=false count=4 reason="Source send cancelled." internal_log_rate_limit=true                                           │
│ 2024-07-18T14:31:40.036284Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] is being suppressed to avoid flooding.                                                                            │
│ 2024-07-18T14:31:47.850450Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] has been suppressed 6 times.                                                                                      │
│ 2024-07-18T14:31:47.850474Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Events dropped intentional=false count=9 reason="Source send cancelled." internal_log_rate_limit=true                                           │
│ 2024-07-18T14:31:47.995622Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] is being suppressed to avoid flooding.                                                                            │
│ 2024-07-18T14:31:58.082077Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] has been suppressed 10 times.                                                                                     │
│ 2024-07-18T14:31:58.082101Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Events dropped intentional=false count=2 reason="Source send cancelled." internal_log_rate_limit=true                                           │
│ 2024-07-18T14:31:59.013847Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] is being suppressed to avoid flooding.                                                                            │
│ 2024-07-18T14:32:09.271378Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] has been suppressed 12 times.                                                                                     │
│ 2024-07-18T14:32:09.271405Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Events dropped intentional=false count=7 reason="Source send cancelled." internal_log_rate_limit=true                                           │
│ 2024-07-18T14:32:11.986565Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector" grpc_method="PushEvents"}: vector_common::internal_event::component_events_dropped: Internal log [Events dropped] is being suppressed to avoid flooding.                                                                            │
│ 2024-07-18T14:32:19.501423Z ERROR source{component_kind="source" component_id=vector component_type=vector}:grpc-request{grpc_service="vector.Vector"

thus I'm confused what's happening. I'd expected vector-aggregator to start dropping data for sink B, and keep chugging along; and not apply backpressure to source/sink A.

jszwedko commented 1 month ago

Thanks for the additional detail @nmiculinic . It seems possible that there is a bug here. I'm wondering if sink B is buffering the data in such a way that it causes the acks to remain open until it has processed the data (by dropping it). Let me ask a teammate more familiar with this area.

jszwedko commented 1 month ago

@nmiculinic one way to validate my hypothesis would be to check if the buffers for sink B is actually filling up completely. Could you check the vector_buffer_events_total metric for that sink?

jszwedko commented 1 month ago

I got some more info that clarified my understanding of how sink acknowledgements work. The current implementation has sources wait for all configured sinks even if only a subset have acknowledgements enabled. We intended to change this behavior to only wait for sinks that actually have acks enabled, but apparently never got to it yet: https://github.com/vectordotdev/vector/issues/7369

I think what you are experiencing is the lack of https://github.com/vectordotdev/vector/issues/7369 being implemented so that only sinks with acks enabled actually cause the source to wait. I'll close this one in-lieu of that one, but please feel free to leave any additional thoughts over there.