vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
18.14k stars 1.6k forks source link

Loadbalance data between sinks #18164

Open saeed-mcu opened 1 year ago

saeed-mcu commented 1 year ago

A note for the community

load balance input/source between multiple sink

Use Cases

For example, I have one source (socket) and two sinks (kafka). Input rate is 2K/s and I want to load balance between 2 sinks (each one 1K/s)

If I change the input of sinks to use the same source, the input is duplicated on both sinks and they have 2k/s, but I want them to have 1k (round robin load balancing)

I want something like this:

                                    1K/s
                     +------------------------------+
                     |                           [sink]------------>
2K/s----->[source]---+                              |
                     |                           [sink]----------->
                     +-----------------------------+
                                     1 K/s

But vector output is something like this :

                                    2K/s
                     +------------------------------+
                     |                           [sink]------------>
2K/s----->[source]---+                              |
                     |                           [sink]----------->
                     +-----------------------------+
                                     2 K/s

Attempted Solutions

No response

Proposal

No response

References

No response

Version

No response

hhromic commented 1 year ago

We have the same use case and we implemented a Vector load balancing pipeline like this:

api:
  enabled: true

sources:
  input:
    type: demo_logs
    format: syslog
    interval: 0.1

transforms:
  partition:
    type: remap
    inputs: ["input"]
    drop_on_error: true
    source: |
      .partition = mod(to_unix_timestamp!(.timestamp, unit: "milliseconds"), 2)
  route:
    type: route
    inputs: ["partition"]
    route:
      output1: '.partition == 0'
      output2: '.partition == 1'

sinks:
  output1:
    type: blackhole
    inputs: ["route.output1"]
    print_interval_secs: 0
  output2:
    type: blackhole
    inputs: ["route.output2"]
    print_interval_secs: 0

Basically we use a partitioner transform based on the modulo of the event timestamp and a router to distribute. This gives us a fairly uniform distribution of input events across N output sinks. We use a YAML templating tool to generate the above pipeline easily. Of course other tools can be used such as Jsonnet for the same purpose.

You can run the above pipeline and use vector top to see it distributing the load in real-time. We have been using the above approach for some time now and it performs quite well for us, specially in latest versions of Vector.

I do agree that it would still be great if Vector could implement simple load-balancing natively for maximum throughput.

pront commented 1 year ago

Thank you @hhromic for the detailed response, this is what I would also suggest.

Hi @saeed-mcu, does this suggestion work for you?

saeed-mcu commented 1 year ago

Hi @pront Yes, it works for me and I am using it with some changes of course. But as @hhromic explained , it would still be great if Vector could implement simple load-balancing natively for maximum throughput.

pront commented 1 year ago

Thank you for the reply, I will forward to the appropriate team for review.

jszwedko commented 1 year ago

Hi @saeed-mcu !

Could you share a bit more about your use-case? We are wondering why you want to split events between two kafka sinks.

Also curious about yours @hhromic . I know you've shared a config like that before but I can't remember why you needed to split up events like that either. Would you be able to share more?

That would help us figure out the right UX for this sort of feature.

hhromic commented 1 year ago

Hey @jszwedko , here is a summary of our use case and design decisions with Vector.

We consume observability data from third parties using UDP and TCP. The data itself is plain-text, often transmitted using the syslog protocol and often containing CEF-formatted payloads. The data volume for these pipelines is quite high. For example, single feed can be around 60-70K EPS via UDP.

For UDP sources, it is a known that Vector is not performant, so I implemented a simple UDP-to-TCP proxy program in Go with a simple queue that relays data to Vector using TCP, for which is quite performant. Furthermore, given that Vector does not support PROXY protocol (see feature request #6769), this UDP-to-TCP proxy also implements a custom injection of the source peer address/port into the proxied data for the destination Vector to fill the correct original source information.

Due to the high ingestion volumes, a single Vector instance is not sufficient to process the data (parsing, remapping, etc) and therefore we need to load-balance the pipeline. For this, I made a load-balancing Vector pipeline that acts as the frontend to the data source (using TCP only, for UDP we use the mentioned UDP-to-TCP proxy). This LB pipeline uses the above technique to partition and route the incoming data to N (configurable) downstream Vector instances that do the actual processing. This communication uses vector sink/source pairs.

The Vector LB pipeline is quite straightforward and simple (similar to my example in https://github.com/vectordotdev/vector/issues/18164#issuecomment-1666512889). A single Vector instance is performant enough to ingest the high volumes we need, partition and distribute the data to the downstream Vector workers that are replicated according to individual source needs.

While my implementation of the LB pipeline with remap is okay for now, it would indeed be nice if all that logic can be done natively for maximum throughput/performance. For example, you may notice that he mod() based partitioning is "almost" round-robin but not really. To make it truly round-robin, remap would need to preserve state so we can have an index counter. In this case a native partitioner/router would be simpler/less invasive to implement.

At the moment Vector implements "fanout" DAG node distribution, perhaps an idea for the UX would be to provide configurable distribution mechanisms between a source DAG node component and N node components. I think sensible options could be fanout (existing mode), round-robin and keyed. The latter would need a field configuration to specify which field(s) in the events to use as key for partitioning. This would be very similar to how Kafka does producer keying.

jszwedko commented 1 year ago

Thanks for the detailed description @hhromic ! Out of curiosity, had you considered using a load balancer like HAProxy or Envoy to do the routing to backends rather than templating the Vector configuration to have multiple sinks? Just curious what pros/cons you saw if so. We discussed today the idea that Vector could potentially do very naive client side load balancing but we would never want to add all of the features that a "real load balancer" has since it seems like it wouldn't provide sufficient value, given the existence of load balancers, to outweigh the costs.

Agreed that your workaround is not quite round robin but should be fairly well distributed. It could be made a bit simpler now that there is a random_int function in VRL.

UDP performance is definitely an issue we are aware of 😓

hhromic commented 1 year ago

Out of curiosity, had you considered using a load balancer like HAProxy or Envoy to do the routing to backends rather than templating the Vector configuration to have multiple sinks?

Yes, I tried my best to not re-invent the wheel. Unfortunately existing load balancers had shortcomings for our use-case:

On the other hand, implementing the LB with Vector itself provided all the features we needed, dynamic configuration, lightweight and the vector sink/source pairs are ideal to transparently forward data to downstream Vectors. I do agree that the LB is basic and not so fully featured as established solutions, but for simple/straightforward use cases I think Vector would be a very powerful "swiss-army-knife" if it supported simple load balancing natively.

Agreed that your workaround is not quite round robin but should be fairly well distributed. It could be made a bit simpler now that there is a random_int function in VRL.

My current partitioner is based on time so the higher the volume of data, the more "round-robin" it will be. However on low-traffic data the distribution does become more unstable. You can argue that in such case you don't need a LB to begin with, but sometimes the feeds are bursty and you have both situations continuously.

Regarding random:int(), interesting! I didn't notice that you implemented random integer generation now. Indeed I was missing that in my original partitioner design. Is this a fast random generator? Compared to mod() + to_unix_timestamp() I would think it would be a bit slower.

jszwedko commented 1 year ago

Thanks @hhromic ! I'll have to digest this a bit more, but responding to:

Regarding random:int(), interesting! I didn't notice that you implemented random integer generation now. Indeed I was missing that in my original partitioner design. Is this a fast random generator? Compared to mod() + to_unix_timestamp() I would think it would be a bit slower.

It does indeed look like it is intended to be cryptographically secure: https://docs.rs/rand/latest/rand/rngs/struct.ThreadRng.html. We could add additional fast_ analogues that use https://docs.rs/fastrand/latest/fastrand/ to generate non-secure random numbers.

pront commented 1 year ago

Hi @hhromic, I am not familiar with your applications so please take the following with a grain of salt:

Our applications are highly containerised, and we prefer highly configurable containers via environment variables as much as possible. Having to maintain static configuration files is a headache. Configurations some times need to be adjusted quickly and re-deploying config files is cumbersome. Our Vector-based load balancer image is fully dynamic and generates a Vector pipeline based on environment variables (number of sinks, sink target addresses).

Configuration management is a difficult problem indeed but not clear how Vector native LB can help here. Especially for Docker, configuration has been a weak security joint, hence products like Hashicrop Vault exist. Perhaps a configuration service can pay dividends long-term.

I investigated nginx, HAProxy, Envoy and Traefik. All of which need static configurations with the exception of Traefik that can be configured using container labels. Of these options, Traefik is the most promising, specially paired with our UDP-to-TCP proxy given that Traefik's UDP support is limited. HAProxy does not support UDP at all and Envoy had terrible UDP performance inside of a container. Not sure why but I didn't have much time to investigate further so fair play with Envoy.

Based on my limited experience with Envoy, it should provide you with an out-of-the-box solution. Can you share more details about the performance concerns?

saeed-mcu commented 1 year ago

Hi @jszwedko I am using vector as a log parser (tokenization and normalization) before indexing in opensearch for other processing and analysis..

                           +--->[vector-1] ---> [redpanda] ---> [vector-2] ---+ 
 Syslog (UDP) --> [nginx]--+                                                  +--->[opensearch]
                           +--->[vector-1] ---> [redpanda] ---> [vector-2] ---+

The input rate is quite high and I use nginx as a transparent load balancer to distribute logs between vectors. vector-1 is configured as follow :

 [input (socket udp)] ---> [transforms] ---> [sink (kafka producer)]

Redpanda performs at least 10 times faster than Kafka and It is kafka api compatible so kafka sink works fine for me. I want more than one kafka producer to produce log into kafka topic and each one has its own partitions, so I can consume logs in vector-2 at high speed if I configured vector-2 in consumer-group mode for kafka.

                              10K/s
                     +---------------+
                     |         [sink (kafka-producer-1)]------------> Topic-1 , Partition-0
20K/s----->[source]--+                |
                     |         [sink (kafka-producer-2)]-----------> Topic-1 , Partition-1
                     +---------------+
                             10 K/s

Eventually , I need to loadbalance logs (in a round-robin fashion) between sinks (kafka producers) to use all partitions of one topic. Currently I solved it with something like @hhromic explanations.

james-stevens commented 1 year ago

My use case would be load-balancing & failover to remove single points of failure - yes, I can just do it with haproxy (which is prob what I will do), but would be nice if it was built-in.

If I split based on rand.int() wouldn't that lose half the traffic in the event an end-point is down? Yes, I could buffer it, but only up to a point, as we're handling lots of live data.

Having the option to just divert to a different sink, instead of buffering, would seem to me all it needs to get failover working OK - then that could be paired with making a rand.int() type scenario a built-in feature?

jszwedko commented 1 year ago

Failover might be covered by this RFC that has been on the back burner for a while: https://github.com/vectordotdev/vector/pull/14708