gftea / amqprs

Async & Lock-free RabbitMQ Rust Client, Easy-to-use API
MIT License
214 stars 27 forks source link

basic_pub_sub not closing connection #140

Closed LockedThread closed 3 months ago

LockedThread commented 3 months ago

Description

I was looking to make a PR to amqprs and before I started working I decided to runt the regression test. The basic_pub_sub example test is not closing the channel, which is causing the program to live forever. I am checked out onto the main branch.

Command

RUST_LOG=trace cargo run --example basic_pub_sub --all-features

Output

2024-07-18T19:56:39.886695Z TRACE mio::poll: registering event source with poller: token=Token(0), interests=READABLE | WRITABLE    
2024-07-18T19:56:39.887858Z TRACE amqprs::net::split_connection: 530 bytes read from network
2024-07-18T19:56:39.887954Z TRACE amqprs::net::split_connection: RECV on channel 0: Start(MethodHeader { class_id: 10, method_id: 10 }, Start { version_major: 0, version_minor: 9, server_properties: FieldTable(470, {ShortStr(12, "cluster_name"): S(LongStr(19, "rabbit@987bfe21cd2d")), ShortStr(11, "information"): S(LongStr(57, "Licensed under the MPL 2.0. Website: https://rabbitmq.com")), ShortStr(8, "platform"): S(LongStr(17, "Erlang/OTP 25.3.2")), ShortStr(7, "product"): S(LongStr(8, "RabbitMQ")), ShortStr(7, "version"): S(LongStr(7, "3.11.15")), ShortStr(12, "capabilities"): F(FieldTable(199, {ShortStr(16, "per_consumer_qos"): t(true), ShortStr(18, "connection.blocked"): t(true), ShortStr(15, "direct_reply_to"): t(true), ShortStr(22, "consumer_cancel_notify"): t(true), ShortStr(28, "authentication_failure_close"): t(true), ShortStr(10, "basic.nack"): t(true), ShortStr(18, "publisher_confirms"): t(true), ShortStr(26, "exchange_exchange_bindings"): t(true), ShortStr(19, "consumer_priorities"): t(true)})), ShortStr(9, "copyright"): S(LongStr(55, "Copyright (c) 2007-2023 VMware, Inc. or its affiliates."))}), mechanisms: LongStr(29, "AMQPLAIN PLAIN RABBIT-CR-DEMO"), locales: LongStr(5, "en_US") })
2024-07-18T19:56:39.888026Z TRACE amqprs::net::split_connection: SENT on channel 0: StartOk(MethodHeader { class_id: 10, method_id: 11 }, StartOk { client_properties: FieldTable(142, {ShortStr(7, "version"): S(LongStr(3, "0.1")), ShortStr(8, "platform"): S(LongStr(4, "Rust")), ShortStr(15, "connection_name"): S(LongStr(25, "AMQPRS000@localhost:5672/")), ShortStr(7, "product"): S(LongStr(6, "AMQPRS")), ShortStr(12, "capabilities"): F(FieldTable(25, {ShortStr(22, "consumer_cancel_notify"): t(true)}))}), machanisms: ShortStr(5, "PLAIN"), response: LongStr(13, "\0user\0bitnami"), locale: ShortStr(5, "en_US") })
2024-07-18T19:56:39.888569Z TRACE amqprs::net::split_connection: 20 bytes read from network
2024-07-18T19:56:39.888584Z TRACE amqprs::net::split_connection: RECV on channel 0: Tune(MethodHeader { class_id: 10, method_id: 30 }, Tune { channel_max: 2047, frame_max: 131072, heartbeat: 60 })
2024-07-18T19:56:39.888596Z TRACE amqprs::net::split_connection: SENT on channel 0: TuneOk(MethodHeader { class_id: 10, method_id: 31 }, TuneOk { channel_max: 2047, frame_max: 131072, heartbeat: 60 })
2024-07-18T19:56:39.888621Z TRACE amqprs::net::split_connection: SENT on channel 0: Open(MethodHeader { class_id: 10, method_id: 40 }, Open { virtual_host: ShortStr(1, "/"), capabilities: ShortStr(0, ""), insist: 0 })
2024-07-18T19:56:39.931179Z TRACE amqprs::net::split_connection: 13 bytes read from network
2024-07-18T19:56:39.931223Z TRACE amqprs::net::split_connection: RECV on channel 0: OpenOk(MethodHeader { class_id: 10, method_id: 41 }, OpenOk { know_hosts: ShortStr(0, "") })
2024-07-18T19:56:39.931474Z DEBUG amqprs::net::reader_handler: register channel resource on connection 'AMQPRS000@localhost:5672/ [open]'
2024-07-18T19:56:39.931518Z  INFO amqprs::api::connection: open connection AMQPRS000@localhost:5672/
2024-07-18T19:56:39.931616Z DEBUG amqprs::net::reader_handler: callback registered on connection 'AMQPRS000@localhost:5672/ [open]'
2024-07-18T19:56:39.931643Z DEBUG amqprs::net::reader_handler: register channel resource on connection 'AMQPRS000@localhost:5672/ [open]'
2024-07-18T19:56:39.931978Z TRACE amqprs::net::split_connection: SENT on channel 1: OpenChannel(MethodHeader { class_id: 20, method_id: 10 }, OpenChannel { out_of_band: ShortStr(0, "") })
2024-07-18T19:56:39.932031Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 8536, tv_nsec: 373637032 }
2024-07-18T19:56:39.932700Z TRACE amqprs::net::split_connection: 16 bytes read from network
2024-07-18T19:56:39.932733Z TRACE amqprs::net::split_connection: RECV on channel 1: OpenChannelOk(MethodHeader { class_id: 20, method_id: 11 }, OpenChannelOk { channel_id: LongStr(0, "") })
2024-07-18T19:56:39.932747Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 8566, tv_nsec: 374353268 }
2024-07-18T19:56:39.932832Z  INFO amqprs::api::connection: open channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2024-07-18T19:56:39.932888Z TRACE amqprs::api::channel::dispatcher: starts up dispatcher task of channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2024-07-18T19:56:39.934051Z DEBUG amqprs::api::channel::dispatcher: callback registered on channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2024-07-18T19:56:39.934216Z TRACE amqprs::net::split_connection: SENT on channel 1: DeclareQueue(MethodHeader { class_id: 50, method_id: 10 }, DeclareQueue { ticket: 0, queue: ShortStr(21, "amqprs.examples.basic"), bits: 2, arguments: FieldTable(0, {}) })
2024-07-18T19:56:39.934252Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 8536, tv_nsec: 375859900 }
2024-07-18T19:56:39.934703Z TRACE amqprs::net::split_connection: 42 bytes read from network
2024-07-18T19:56:39.934720Z TRACE amqprs::net::split_connection: RECV on channel 1: DeclareQueueOk(MethodHeader { class_id: 50, method_id: 11 }, DeclareQueueOk { queue: ShortStr(21, "amqprs.examples.basic"), message_count: 0, consumer_count: 2 })
2024-07-18T19:56:39.934732Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 8566, tv_nsec: 376340539 }
2024-07-18T19:56:39.934866Z TRACE amqprs::net::split_connection: SENT on channel 1: BindQueue(MethodHeader { class_id: 50, method_id: 20 }, BindQueue { ticket: 0, queue: ShortStr(21, "amqprs.examples.basic"), exchange: ShortStr(9, "amq.topic"), routing_key: ShortStr(14, "amqprs.example"), nowait: false, arguments: FieldTable(0, {}) })
2024-07-18T19:56:39.934908Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 8536, tv_nsec: 376515447 }
2024-07-18T19:56:39.935381Z TRACE amqprs::net::split_connection: 12 bytes read from network
2024-07-18T19:56:39.935390Z TRACE amqprs::net::split_connection: RECV on channel 1: BindQueueOk(MethodHeader { class_id: 50, method_id: 21 }, BindQueueOk)
2024-07-18T19:56:39.935397Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 8566, tv_nsec: 377005396 }
2024-07-18T19:56:39.935471Z TRACE amqprs::net::split_connection: SENT on channel 1: Consume(MethodHeader { class_id: 60, method_id: 20 }, Consume { ticket: 0, queue: ShortStr(21, "amqprs.examples.basic"), consumer_tag: ShortStr(21, "example_basic_pub_sub"), bits: 0, arguments: FieldTable(0, {}) })
2024-07-18T19:56:39.935494Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 8536, tv_nsec: 377102035 }
2024-07-18T19:56:39.935893Z TRACE amqprs::net::split_connection: 34 bytes read from network
2024-07-18T19:56:39.935901Z TRACE amqprs::net::split_connection: RECV on channel 1: ConsumeOk(MethodHeader { class_id: 60, method_id: 21 }, ConsumeOk { consumer_tag: ShortStr(21, "example_basic_pub_sub") })
2024-07-18T19:56:39.935907Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 8566, tv_nsec: 377515624 }
2024-07-18T19:56:39.936013Z TRACE amqprs::api::channel::basic: starts task for async consumer example_basic_pub_sub on channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2024-07-18T19:56:39.936020Z  INFO amqprs::api::channel::dispatcher: register consumer example_basic_pub_sub
2024-07-18T19:56:39.936026Z TRACE amqprs::net::split_connection: SENT on channel 1: PublishCombo(Publish { ticket: 0, exchange: ShortStr(9, "amq.topic"), routing_key: ShortStr(14, "amqprs.example"), bits: 0 }, ContentHeader { common: ContentHeaderCommon { class: 60, weight: 0, body_size: 117 }, basic_properties: BasicProperties { property_flags: [0, 0], content_type: None, content_encoding: None, headers: None, delivery_mode: None, priority: None, correlation_id: None, reply_to: None, expiration: None, message_id: None, timestamp: None, message_type: None, user_id: None, app_id: None, cluster_id: None } }, ContentBody { inner: [10, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 123, 10, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 34, 112, 117, 98, 108, 105, 115, 104, 101, 114, 34, 58, 32, 34, 101, 120, 97, 109, 112, 108, 101, 34, 10, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 34, 100, 97, 116, 97, 34, 58, 32, 34, 72, 101, 108, 108, 111, 44, 32, 97, 109, 113, 112, 114, 115, 33, 34, 10, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 32, 125, 10, 32, 32, 32, 32, 32, 32, 32, 32] })
2024-07-18T19:56:39.936072Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 8536, tv_nsec: 377680484 }
2024-07-18T19:56:40.936840Z  INFO amqprs::api::channel: close channel 1 [closed] of connection 'AMQPRS000@localhost:5672/ [open]'
2024-07-18T19:56:40.937412Z TRACE amqprs::net::split_connection: SENT on channel 1: CloseChannel(MethodHeader { class_id: 20, method_id: 40 }, CloseChannel { reply_code: 200, reply_text: ShortStr(0, ""), class_id: 0, method_id: 0 })
2024-07-18T19:56:40.937631Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 8537, tv_nsec: 379234178 }
2024-07-18T19:57:09.889517Z TRACE amqprs::net::split_connection: 8 bytes read from network
2024-07-18T19:57:09.889551Z TRACE amqprs::net::split_connection: RECV on channel 0: HeartBeat(HeartBeat)
2024-07-18T19:57:09.889564Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 8596, tv_nsec: 331172526 }
2024-07-18T19:57:09.889577Z DEBUG amqprs::net::reader_handler: received heartbeat on connection 'AMQPRS000@localhost:5672/ [open]'
2024-07-18T19:57:10.939137Z TRACE amqprs::net::split_connection: SENT on channel 0: HeartBeat(HeartBeat)
2024-07-18T19:57:10.939208Z DEBUG amqprs::net::writer_handler: sent heartbeat over connection 'AMQPRS000@localhost:5672/ [open]'
LockedThread commented 3 months ago

I determined it had something to do with the rabbitmq setup. I got it working here: https://github.com/LockedThread/amqprs/tree/feat/remove-bitnami. Will make PR and discuss after https://github.com/gftea/amqprs/pull/141

gftea commented 3 months ago

I cannot reproduce this. see the run https://github.com/gftea/amqprs/actions/runs/10037852879/job/27738546051?pr=142

LockedThread commented 3 months ago

I cannot reproduce this. see the run https://github.com/gftea/amqprs/actions/runs/10037852879/job/27738546051?pr=142

weird, I will re-assess later