vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
18.26k stars 1.6k forks source link

Vector exits after start when AMQP sink is unhealthy despite setting --require-healthy flag to false #19466

Open mtrbpr opened 11 months ago

mtrbpr commented 11 months ago

A note for the community

Problem

It seems that disabling sinks healthcheck via setting --require-healthy flag to false on startup has no effect when sink type is amqp and no vhost is specified in amqp connection_string, or the vhost does not exist at all.

Same thing happens when connection string points to a wrong port:

Configuration

sources:
  demo:
    type: demo_logs
    format: json

sinks:
  amqp-sink:
    connection_string: amqp://guest:guest@127.0.0.1:5672/wrong
    encoding:
      codec: json
    exchange: queue
    inputs:
      - demo
    type: amqp

Version

vector 0.34.1 (x86_64-unknown-linux-gnu 86f1c22 2023-11-16 14:59:10.486846964)

Debug Output

2023-12-25T16:13:55.768432Z DEBUG vector::app: Internal log rate limit configured. internal_log_rate_secs=10
2023-12-25T16:13:55.768481Z  INFO vector::app: Log level is enabled. level="vector=trace,codec=trace,vrl=trace,file_source=trace,tower_limit=trace,rdkafka=trace,buffers=trace,lapin=trace,kube=trace"
2023-12-25T16:13:55.768571Z DEBUG vector::app: messaged="Building runtime." worker_threads=8
2023-12-25T16:13:55.770415Z  INFO vector::app: Loading configs. paths=["vecdor.yaml"]
2023-12-25T16:13:55.771797Z DEBUG vector::config::loading: No secret placeholder found, skipping secret resolution.
2023-12-25T16:13:55.772267Z DEBUG vector::topology::builder: Building new source. component=demo
2023-12-25T16:13:55.772501Z DEBUG vector::topology::builder: Building new sink. component=amqp-sink
2023-12-25T16:13:55.773720Z DEBUG sink{component_kind="sink" component_id=amqp-sink component_type=amqp}: lapin::channels: create channel id=0
2023-12-25T16:13:55.773824Z TRACE sink{component_kind="sink" component_id=amqp-sink component_type=amqp}: lapin::channel: send_frame channel=0
2023-12-25T16:13:55.773862Z TRACE sink{component_kind="sink" component_id=amqp-sink component_type=amqp}: lapin::channel: wake channel=0
2023-12-25T16:13:55.775209Z TRACE lapin::io_loop: io_loop run
2023-12-25T16:13:55.775236Z TRACE lapin::socket_state: Got event for socket event=Wake
2023-12-25T16:13:55.775244Z TRACE lapin::socket_state: Got event for socket event=Wake
2023-12-25T16:13:55.775252Z TRACE lapin::io_loop: io_loop do_run can_read=true can_write=true has_data=true
2023-12-25T16:13:55.775262Z TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::ProtocolHeader(0.9.1)
2023-12-25T16:13:55.775296Z TRACE lapin::io_loop: wrote 8 bytes
2023-12-25T16:13:55.775334Z TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
2023-12-25T16:13:55.775341Z TRACE lapin::io_loop: io_loop run
2023-12-25T16:13:55.775348Z TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
2023-12-25T16:13:55.776116Z TRACE lapin::socket_state: Got event for socket event=Readable
2023-12-25T16:13:55.776136Z TRACE lapin::io_loop: read 517 bytes
2023-12-25T16:13:55.776208Z TRACE lapin::channels: will handle frame frame=Method(0, Connection(Start(Start { version_major: 0, version_minor: 9, server_properties: FieldTable({ShortString("capabilities"): FieldTable(FieldTable({ShortString("authentication_failure_close"): Boolean(true), ShortString("basic.nack"): Boolean(true), ShortString("connection.blocked"): Boolean(true), ShortString("consumer_cancel_notify"): Boolean(true), ShortString("consumer_priorities"): Boolean(true), ShortString("direct_reply_to"): Boolean(true), ShortString("exchange_exchange_bindings"): Boolean(true), ShortString("per_consumer_qos"): Boolean(true), ShortString("publisher_confirms"): Boolean(true)})), ShortString("cluster_name"): LongString(LongString([114, 97, 98, 98, 105, 116, 64, 56, 52, 53, 98, 97, 52, 99, 55, 100, 97, 54, 57])), ShortString("copyright"): LongString(LongString([67, 111, 112, 121, 114, 105, 103, 104, 116, 32, 40, 99, 41, 32, 50, 48, 48, 55, 45, 50, 48, 50, 51, 32, 86, 77, 119, 97, 114, 101, 44, 32, 73, 110, 99, 46, 32, 111, 114, 32, 105, 116, 115, 32, 97, 102, 102, 105, 108, 105, 97, 116, 101, 115, 46])), ShortString("information"): LongString(LongString([76, 105, 99, 101, 110, 115, 101, 100, 32, 117, 110, 100, 101, 114, 32, 116, 104, 101, 32, 77, 80, 76, 32, 50, 46, 48, 46, 32, 87, 101, 98, 115, 105, 116, 101, 58, 32, 104, 116, 116, 112, 115, 58, 47, 47, 114, 97, 98, 98, 105, 116, 109, 113, 46, 99, 111, 109])), ShortString("platform"): LongString(LongString([69, 114, 108, 97, 110, 103, 47, 79, 84, 80, 32, 50, 53, 46, 51, 46, 50, 46, 56])), ShortString("product"): LongString(LongString([82, 97, 98, 98, 105, 116, 77, 81])), ShortString("version"): LongString(LongString([51, 46, 49, 50, 46, 49, 48]))}), mechanisms: LongString([80, 76, 65, 73, 78, 32, 65, 77, 81, 80, 76, 65, 73, 78]), locales: LongString([101, 110, 95, 85, 83]) })))
2023-12-25T16:13:55.776275Z TRACE lapin::channel: Server sent connection::Start method=Start { version_major: 0, version_minor: 9, server_properties: FieldTable({ShortString("capabilities"): FieldTable(FieldTable({ShortString("authentication_failure_close"): Boolean(true), ShortString("basic.nack"): Boolean(true), ShortString("connection.blocked"): Boolean(true), ShortString("consumer_cancel_notify"): Boolean(true), ShortString("consumer_priorities"): Boolean(true), ShortString("direct_reply_to"): Boolean(true), ShortString("exchange_exchange_bindings"): Boolean(true), ShortString("per_consumer_qos"): Boolean(true), ShortString("publisher_confirms"): Boolean(true)})), ShortString("cluster_name"): LongString(LongString([114, 97, 98, 98, 105, 116, 64, 56, 52, 53, 98, 97, 52, 99, 55, 100, 97, 54, 57])), ShortString("copyright"): LongString(LongString([67, 111, 112, 121, 114, 105, 103, 104, 116, 32, 40, 99, 41, 32, 50, 48, 48, 55, 45, 50, 48, 50, 51, 32, 86, 77, 119, 97, 114, 101, 44, 32, 73, 110, 99, 46, 32, 111, 114, 32, 105, 116, 115, 32, 97, 102, 102, 105, 108, 105, 97, 116, 101, 115, 46])), ShortString("information"): LongString(LongString([76, 105, 99, 101, 110, 115, 101, 100, 32, 117, 110, 100, 101, 114, 32, 116, 104, 101, 32, 77, 80, 76, 32, 50, 46, 48, 46, 32, 87, 101, 98, 115, 105, 116, 101, 58, 32, 104, 116, 116, 112, 115, 58, 47, 47, 114, 97, 98, 98, 105, 116, 109, 113, 46, 99, 111, 109])), ShortString("platform"): LongString(LongString([69, 114, 108, 97, 110, 103, 47, 79, 84, 80, 32, 50, 53, 46, 51, 46, 50, 46, 56])), ShortString("product"): LongString(LongString([82, 97, 98, 98, 105, 116, 77, 81])), ShortString("version"): LongString(LongString([51, 46, 49, 50, 46, 49, 48]))}), mechanisms: LongString([80, 76, 65, 73, 78, 32, 65, 77, 81, 80, 76, 65, 73, 78]), locales: LongString([101, 110, 95, 85, 83]) }
2023-12-25T16:13:55.776344Z TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
2023-12-25T16:13:55.776354Z TRACE lapin::io_loop: io_loop run
2023-12-25T16:13:55.776362Z TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
2023-12-25T16:13:55.776400Z TRACE lapin::channel: send_frame channel=0
2023-12-25T16:13:55.776415Z TRACE lapin::channel: wake channel=0
2023-12-25T16:13:55.776456Z TRACE lapin::socket_state: Got event for socket event=Wake
2023-12-25T16:13:55.776492Z TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Connection(StartOk(StartOk { client_properties: FieldTable({ShortString("capabilities"): FieldTable(FieldTable({ShortString("authentication_failure_close"): Boolean(true), ShortString("basic.nack"): Boolean(true), ShortString("connection.blocked"): Boolean(true), ShortString("consumer_cancel_notify"): Boolean(true), ShortString("consumer_priorities"): Boolean(true), ShortString("direct_reply_to"): Boolean(true), ShortString("exchange_exchange_bindings"): Boolean(true), ShortString("per_consumer_qos"): Boolean(true), ShortString("publisher_confirms"): Boolean(true)})), ShortString("platform"): LongString(LongString([114, 117, 115, 116])), ShortString("product"): LongString(LongString([108, 97, 112, 105, 110])), ShortString("version"): LongString(LongString([50, 46, 51, 46, 49]))}), mechanism: ShortString("PLAIN"), response: LongString([0, 103, 117, 101, 115, 116, 0, 103, 117, 101, 115, 116]), locale: ShortString("en_US") })))
2023-12-25T16:13:55.776536Z TRACE lapin::io_loop: wrote 315 bytes
2023-12-25T16:13:55.776548Z TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
2023-12-25T16:13:55.776574Z TRACE lapin::io_loop: io_loop run
2023-12-25T16:13:55.776584Z TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
2023-12-25T16:13:55.776780Z TRACE lapin::socket_state: Got event for socket event=Readable
2023-12-25T16:13:55.776792Z TRACE lapin::io_loop: read 20 bytes
2023-12-25T16:13:55.776807Z TRACE lapin::channels: will handle frame frame=Method(0, Connection(Tune(Tune { channel_max: 2047, frame_max: 131072, heartbeat: 60 })))
2023-12-25T16:13:55.776815Z TRACE lapin::channel: Server sent Connection::Tune method=Tune { channel_max: 2047, frame_max: 131072, heartbeat: 60 }
2023-12-25T16:13:55.776830Z TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
2023-12-25T16:13:55.776840Z TRACE lapin::io_loop: io_loop run
2023-12-25T16:13:55.776852Z TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
2023-12-25T16:13:55.776885Z TRACE lapin::channel: send_frame channel=0
2023-12-25T16:13:55.776895Z TRACE lapin::channel: wake channel=0
2023-12-25T16:13:55.776914Z TRACE lapin::socket_state: Got event for socket event=Wake
2023-12-25T16:13:55.776923Z TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Connection(TuneOk(TuneOk { channel_max: 2047, frame_max: 131072, heartbeat: 60 })))
2023-12-25T16:13:55.776944Z TRACE lapin::io_loop: wrote 20 bytes
2023-12-25T16:13:55.776963Z TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
2023-12-25T16:13:55.776971Z TRACE lapin::channel: send_frame channel=0
2023-12-25T16:13:55.776973Z TRACE lapin::io_loop: io_loop run
2023-12-25T16:13:55.777016Z TRACE lapin::frames: state is now waiting channel=0 expected_reply=ExpectedReply(ConnectionOpenOk(Pinky, Connection { configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 60 }, status: ConnectionStatus { state: Connecting, vhost: "wrong", username: "guest", blocked: false }, channels: Channels { channels: [Channel { id: 0, configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 60 }, status: ChannelStatus { state: Connected, receiver_state: ChannelReceiverStates([]), confirm: false, send_flow: true }, connection_status: ConnectionStatus { state: Connecting, vhost: "wrong", username: "guest", blocked: false }, acknowledgements: Acknowledgements { delivery_tag: IdSequence { allow_zero: false, max: None, id: 0 }, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, pending: [] }, consumers: Consumers({}), basic_get_delivery: BasicGetDelivery, returned_messages: ReturnedMessages { waiting_messages: [], messages: [], non_confirm_messages: [] }, frames: Frames }], channel_id: IdSequence { allow_zero: false, max: None, id: 0 }, configuration: Configuration { channel_max: 2047, frame_max: 131072, heartbeat: 60 }, frames: Frames, connection_status: ConnectionStatus { state: Connecting, vhost: "wrong", username: "guest", blocked: false }, error_handler: ErrorHandler } }))
2023-12-25T16:13:55.777053Z TRACE lapin::channel: wake channel=0
2023-12-25T16:13:55.777063Z TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=true
2023-12-25T16:13:55.777074Z TRACE lapin::socket_state: Got event for socket event=Wake
2023-12-25T16:13:55.777084Z TRACE lapin::io_loop: will write to buffer next_msg=AMQPFrame::Method(Connection(Open(Open { virtual_host: ShortString("wrong") })))
2023-12-25T16:13:55.777114Z TRACE lapin::io_loop: wrote 20 bytes
2023-12-25T16:13:55.777130Z TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
2023-12-25T16:13:55.777143Z TRACE lapin::io_loop: io_loop run
2023-12-25T16:13:55.777156Z TRACE lapin::io_loop: io_loop do_run can_read=false can_write=true has_data=false
2023-12-25T16:13:55.777424Z TRACE lapin::socket_state: Got event for socket event=Readable
2023-12-25T16:13:55.777442Z TRACE lapin::io_loop: read 54 bytes
2023-12-25T16:13:55.777465Z TRACE lapin::channels: will handle frame frame=Method(0, Connection(Close(Close { reply_code: 530, reply_text: ShortString("NOT_ALLOWED - vhost wrong not found"), class_id: 10, method_id: 40 })))
2023-12-25T16:13:55.777505Z ERROR lapin::channel: Connection closed channel=0 method=Close { reply_code: 530, reply_text: ShortString("NOT_ALLOWED - vhost wrong not found"), class_id: 10, method_id: 40 } error=AMQPError { kind: Hard(NOTALLOWED), message: ShortString("NOT_ALLOWED - vhost wrong not found") }
2023-12-25T16:13:55.777526Z TRACE lapin::internal_rpc: Queuing internal RPC command command=SetConnectionClosing
2023-12-25T16:13:55.777569Z TRACE lapin::internal_rpc: Handling internal RPC command command=SetConnectionClosing
2023-12-25T16:13:55.777614Z TRACE lapin::internal_rpc: Queuing internal RPC command command=SetConnectionClosing
2023-12-25T16:13:55.777612Z TRACE lapin::internal_rpc: Queuing internal RPC command command=SetConnectionError(ProtocolError(AMQPError { kind: Hard(NOTALLOWED), message: ShortString("NOT_ALLOWED - vhost wrong not found") }))
2023-12-25T16:13:55.777629Z TRACE lapin::internal_rpc: Queuing internal RPC command command=CloseConnection(200, "OK", 0, 0)
2023-12-25T16:13:55.777636Z TRACE lapin::internal_rpc: Handling internal RPC command command=SetConnectionClosing
2023-12-25T16:13:55.777649Z TRACE lapin::internal_rpc: Handling internal RPC command command=SetConnectionError(ProtocolError(AMQPError { kind: Hard(NOTALLOWED), message: ShortString("NOT_ALLOWED - vhost wrong not found") }))
2023-12-25T16:13:55.777655Z TRACE lapin::internal_rpc: Queuing internal RPC command command=SendConnectionCloseOk(ProtocolError(AMQPError { kind: Hard(NOTALLOWED), message: ShortString("NOT_ALLOWED - vhost wrong not found") }))
2023-12-25T16:13:55.777668Z ERROR lapin::channels: Connection error error=protocol error: AMQP hard error: NOT-ALLOWED: NOT_ALLOWED - vhost wrong not found
2023-12-25T16:13:55.777671Z TRACE lapin::io_loop: io_loop do_run done can_read=false can_write=true has_data=false status=Initial
2023-12-25T16:13:55.777687Z TRACE lapin::internal_rpc: Stopping internal RPC command
2023-12-25T16:13:55.777684Z TRACE lapin::internal_rpc: Queuing internal RPC command command=RemoveChannel(0, ProtocolError(AMQPError { kind: Hard(NOTALLOWED), message: ShortString("NOT_ALLOWED - vhost wrong not found") }))
2023-12-25T16:13:55.777703Z TRACE lapin::internal_rpc: Handling internal RPC command command=CloseConnection(200, "OK", 0, 0)
2023-12-25T16:13:55.777721Z TRACE lapin::internal_rpc: Handling internal RPC command command=SendConnectionCloseOk(ProtocolError(AMQPError { kind: Hard(NOTALLOWED), message: ShortString("NOT_ALLOWED - vhost wrong not found") }))
2023-12-25T16:13:55.777752Z ERROR vector::topology::builder: Configuration error. error=Sink "amqp-sink": creating amqp producer failed: protocol error: AMQP hard error: NOT-ALLOWED: NOT_ALLOWED - vhost wrong not found
2023-12-25T16:13:55.777761Z TRACE lapin::internal_rpc: InternalRPC stopped
2023-12-25T16:13:55.777790Z TRACE lapin::internal_rpc: Queuing internal RPC command command=SetConnectionError(InvalidChannelState(Error))
2023-12-25T16:13:55.777826Z TRACE lapin::internal_rpc: Queuing internal RPC command command=SetConnectionError(InvalidChannelState(Error))

Example Data

No response

Additional Context

vector is installed with apt and rabbitmq is up on port 5672 with following docker-compose deployment:

services: 
  rabbitmq:
      image: rabbitmq:management
      container_name: rabbitmq
      environment:
        - RABBITMQ_DEFAULT_USER=guest
        - RABBITMQ_DEFAULT_PASS=guest
      ports:
        - "5672:5672"
        - "15672:15672"

References

No response

sxkote commented 10 months ago

Hi. I get the same behavior. Even checkhealthy:enabled=false When the RabbitMQ falls and then reruns, Vector stops to send messages into it.

fedorkanin commented 2 weeks ago

That's hilarious. We are experiencing similar issues with blocking behavior in amqp091-go client (issue1, issue2) and considered Vector as a replacement. Now it seems I have encountered a similar issue to @sxkote's.

Nov 11 13:07:57 my-rabbit-host.example vector[1234567]: 2024-11-11T13:07:57.516987Z ERROR sink{component_kind="sink" component_id=rabbitmq_sink component_type=amqp}:request{request_id=REQUEST_ID_1}: vector_common::internal_event::service: Service call failed. No retries or retries exhausted. error=Some(AcknowledgementFailed { error: IOError(Os { code: 104, kind: ConnectionReset, message: "Connection reset by peer" }) }) request_id=REQUEST_ID_1 error_type="request_failed" stage="sending" internal_log_rate_limit=true
...
some suppressed logs
...
Nov 11 13:08:13 my-rabbit-host.example vector[1234567]: 2024-11-11T13:08:13.215606Z ERROR sink{component_kind="sink" component_id=rabbitmq_sink component_type=amqp}:request{request_id=REQUEST_ID_2}: vector_common::internal_event::service: Service call failed. No retries or retries exhausted. error=Some(DeliveryFailed { error: InvalidChannelState(Error) }) request_id=REQUEST_ID_2 error_type="request_failed" stage="sending" internal_log_rate_limit=true

This log with InvalidChannelState(Error) is then logged forever alongside with some suppressed logs, the reconnect does not happen. If restarted, Vector connects and functions as expected.

I suggest this issue should be replicated in (integration) tests. For now I was unable to find out if a root cause is in lapin.