vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
17.41k stars 1.51k forks source link

AMQP Sink, support for HA Cluster #17182

Open arouene opened 1 year ago

arouene commented 1 year ago

A note for the community

Use Cases

Currently AMQ Sink support for only one server connection. Using the connection_string with this format: amqp://<user>:<password>@<host>:<port>/<vhost>?timeout=<seconds> (excerpt from the documentation).

If the AMQP server is not available when a message is sent, Vector panic and crash on an unwrap().

thread 'vector-worker' panicked at 'called `Result::unwrap()` on an `Err` value: InvalidChannelState(Error)', src/sinks/amqp/service.rs:114:18
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
2023-04-19T15:45:04.977429Z ERROR sink{component_kind="sink" component_id=RabbitMQRE1 component_type=amqp component_name=RabbitMQRE1}: vector::topology: An error occurred that Vector couldn't handle: the task panicked and was aborted.
2023-04-19T15:45:04.977576Z  INFO vector: Vector has stopped.
2023-04-19T15:45:04.977857Z  INFO vector::topology::running: Shutting down... Waiting on running components. remaining_components="postfixSB1, postfix, postfixPW, postfixReduce, postfixtag, postfixRE1, RabbitMQPW, postfixRegexMapping, datadog, postfixRegexFilter, RabbitMQSB1, journaldtag, journald" time_remaining="59 seconds left"

I would like to be able to pass multiple nodes of an AMQP Cluster into the connection_string, and have Vector to try to connect on the other nodes of the cluster in case of disconnection, with potentially an exponential back-off.

Attempted Solutions

No solution for now. Cannot have multiples nodes in the connect_string.

Proposal

I'm willing to work on that solution:

  1. Be able to have multiple nodes in the connection_string

Instead of having: amqp://<user>:<password>@<host>:<port>/<vhost>?timeout=<seconds>, we could have multiple nodes separated with semi-colon, like this:

connection_string = "amqp://<user>:<password>@<host1>:<port>/<vhost>?timeout=<seconds>;amqp://<user>:<password>@<host2>:<port>/<vhost>?timeout=<seconds>;amqp://<user>:<password>@<host3>:<port>/<vhost>?timeout=<seconds>"

Still, having only one server is supported, and is backward compatible.

  1. Retry connection

If a node is disconnected, attempt to reconnect on an other node randomly, with exponential back-off. If only one node is configured in the connection_string, do what is done today (crash) for backward compatibility.

The retry process will continue indefinitely, allowing the buffer backpressure do it's job.

What do you think ? Would you like to have those improvements ?

References

No response

Version

vector 0.28.1 (x86_64-unknown-linux-gnu ff15924 2023-03-06)

zamazan4ik commented 1 year ago

Is it possible to fetch all nodes from RabbitMQ cluster from a node? Like we configure a set of initial nodes, Vector starts, fetches all nodes from the initial nodes (via some API call if it exists) and then uses this information during the work. AFAIK, the same design is already implemented for ElasticSearch sink.

arouene commented 1 year ago

I don't think RabbitMQ works that way. I know other protocols do it, like kafka, or elasticsearch, but I don't believe AFAIK that AMQP do it.

In case of a node failure, clients should be able to reconnect to a different node, recover their topology and continue operation. For this reason, most client libraries accept a list of endpoints (hostnames or IP addresses) as a connection option. The list of hosts will be used during initial connection as well as connection recovery, if the client supports it.

(excerpt from https://www.rabbitmq.com/clustering.html)

But It's possible that I'm missing something.

nhlushak commented 1 year ago

Having exactly same issue. We are using vector container image 0.29.1-alpine.

This sink definitelly requires connection retries on timeouts, that's for sure.

We switched from fluentd to vector because we was struggling with connection drops to amqp, because it was not resolving dns name on connection retry, resulting in continuously trying to connect to already "dead" ip, because our amqp cluster got instance refresh. Now this 🫠

neuronull commented 10 months ago

👋 Just dropping a note that from the output provided, we noticed that this panic is actually a bug:

thread 'vector-worker' panicked at 'called `Result::unwrap()` on an `Err` value: InvalidChannelState(Error)', src/sinks/amqp/service.rs:114:18
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Here: https://github.com/vectordotdev/vector/blob/96f4d73d3a8614d721bdb27845b7721e8d266bb8/src/sinks/amqp/service.rs#L112-L115

I've created a separate issue (#18921) to track this so this issue can serve as just the enhancement request.