Closed marshauf closed 3 years ago
Hi, thanks for such a thorough description!
I think I've stumbled on a similar issue in this commit
Could you please target the master branch and tell me how it goes ?
Thanks again!
Thank you. It' seems to be improved but the master branch doesn't fix the issue.
The unmodified example still does not process any messages most of the time. On one run a thread panicked.
thread 'bastion-async-thread' panicked at 'called `Result::unwrap()` on an `Err` value: SignedMessage { msg: Msg(Broadcast(Any)), sign: RefAddr { path: /supervisor#d8c21683-e504-4e8d-92a6-231963f5af65/children#bbf3cb01-09dd-4c07-b63a-328ed9992d05/child#b1135e59-b2ec-428b-aad4-a716e892cc37, sender: UnboundedSender(Some(UnboundedSenderInner { inner: UnboundedInner { state: 9223372036854775808, message_queue: Queue { head: 0x7f43c8000e40, tail: UnsafeCell }, num_senders: 17, recv_task: AtomicWaker } })) } }', src/main.rs:149:72
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
On most runs with the modified example, I received completely processed messages. A few runs didn't complete. A few runs had panicked threads.
On a successful run a few warnings were logged: WARN bastion::dispatcher: sending message to child 1/1 - /d00ef5cd-cf35-4a7f-92d7-6c69a56c89ee/69e87b3c-68b1-46c3-a45c-9a53b93094e3/ad7b50a5-c106-46d0-904d-4136024a5339
The panicked threads looked like this:
thread 'bastion-async-thread' panicked at 'called `Result::unwrap()` on an `Err` value: SignedMessage { msg: Msg(Broadcast(Any)), sign: RefAddr { path: /supervisor#96126a55-a2d8-42c1-842c-b2755165a28b/children#304183ac-490a-4ce6-890c-b1747f4fbb97/child#6a3620df-0b68-41b8-beed-4a23a94c2496, sender: UnboundedSender(Some(UnboundedSenderInner { inner: UnboundedInner { state: 9223372036854775809, message_queue: Queue { head: 0x55f9de0a2830, tail: UnsafeCell }, num_senders: 13, recv_task: AtomicWaker } })) } }', src/main.rs:96:64
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'bastion-async-thread' panicked at 'called `Result::unwrap()` on an `Err` value: SignedMessage { msg: Msg(Broadcast(Any)), sign: RefAddr { path: /supervisor#b3b68710-56a4-402c-a1bf-795387b7a116/children#1347ae6b-c90d-4087-bae1-f7abdc1150c0/child#15086617-421c-460a-83ee-e6c67233c8fc, sender: UnboundedSender(Some(UnboundedSenderInner { inner: UnboundedInner { state: 9223372036854775809, message_queue: Queue { head: 0x55f9de09e8a0, tail: UnsafeCell }, num_senders: 15, recv_task: AtomicWaker } })) } }', src/main.rs:149:72
Super interesting, I think I might be able to reproduce it with the info you gave me, on my way! :)
Soooo I think the order is kind of wrong ? I mean we're sending things before someone is available to receive them which will... go bad!
@marshauf can you please try to target igni/broadcast_message_example_order and let me know how it goes ? :)
I made the same modification to the example, too. As I wrote it improved it a lot. Targeting the master branch improved it further. But a few runs still don't end with [Response] Aggregated data: `{"C": 5, "A": 2, "B": 2}
. They look like this:
Aug 30 19:37:20.903 INFO bastion::system: System: Initializing.
Aug 30 19:37:20.904 INFO bastion::system: System: Launched.
Aug 30 19:37:20.905 INFO broadcast_message: [Response] Worker started!
Aug 30 19:37:20.906 INFO broadcast_message: [Processing] Worker started!
Aug 30 19:37:20.906 INFO broadcast_message: [Processing] Worker started!
Aug 30 19:37:20.906 INFO broadcast_message: [Processing] Worker started!
Aug 30 19:37:20.906 INFO broadcast_message: [Input] Worker started!
Aug 30 19:37:20.907 INFO bastion::system: System: Starting.
Aug 30 19:37:20.907 INFO bastion::system: System: Launching Supervisor(00000000-0000-0000-0000-000000000000).
Aug 30 19:37:20.907 WARN bastion::dispatcher: sending message to child 1/3 - /36a9c73a-169d-4a4c-a355-38e73e0a79de/465c387f-fb6c-4937-88fe-4ae21a4410cf/8d50486f-dcf6-472d-b095-c58e3ab8157a
[Processing] Worker #BastionId(8d50486f-dcf6-472d-b095-c58e3ab8157a) received `A B C`
[Processing] Worker process #BastionId(8d50486f-dcf6-472d-b095-c58e3ab8157a) processed data. Result: `{"B": 1, "A": 1, "C": 1}`
Aug 30 19:37:20.913 INFO bastion::system: System: Launching Supervisor(cc1c6744-e8ee-48f3-b7ac-ef18c688c07e).
Aug 30 19:37:20.913 INFO bastion::system: System: Launching Supervisor(36a9c73a-169d-4a4c-a355-38e73e0a79de).
Aug 30 19:37:20.913 WARN bastion::dispatcher: sending message to child 2/3 - /36a9c73a-169d-4a4c-a355-38e73e0a79de/465c387f-fb6c-4937-88fe-4ae21a4410cf/6593cc53-f6af-4314-a428-434aed05c5c2
Aug 30 19:37:20.913 WARN bastion::dispatcher: sending message to child 1/1 - /cc1c6744-e8ee-48f3-b7ac-ef18c688c07e/f46a9904-bdcb-4e52-b15a-45c6cabb2983/390a9e42-7096-4fbf-825a-bb913b546155
thread 'bastion-async-thread' panicked at 'called `Result::unwrap()` on an `Err` value: SignedMessage { msg: Msg(Broadcast(Any)), sign: RefAddr { path: /supervisor#b97b1675-10b0-4289-a2dc-dfa29f6be134/children#6733e2c0-df0e-4081-9c3e-210bcf84f76c/child#92825160-b515-41b0-910a-8ba81233af2a, sender: UnboundedSender(Some(UnboundedSenderInner { inner: UnboundedInner { state: 9223372036854775808, message_queue: Queue { head: 0x562d27b98e60, tail: UnsafeCell }, num_senders: 13, recv_task: AtomicWaker } })) } }', src/main.rs:96:64
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Aug 30 19:37:20.914 INFO bastion::system: System: Launching Supervisor(b97b1675-10b0-4289-a2dc-dfa29f6be134).
thread 'bastion-async-thread' panicked at 'called `Result::unwrap()` on an `Err` value: SignedMessage { msg: Msg(Broadcast(Any)), sign: RefAddr { path: /supervisor#36a9c73a-169d-4a4c-a355-38e73e0a79de/children#465c387f-fb6c-4937-88fe-4ae21a4410cf/child#8d50486f-dcf6-472d-b095-c58e3ab8157a, sender: UnboundedSender(Some(UnboundedSenderInner { inner: UnboundedInner { state: 9223372036854775808, message_queue: Queue { head: 0x562d27b951c0, tail: UnsafeCell }, num_senders: 20, recv_task: AtomicWaker } })) } }', src/main.rs:149:72
Aug 30 19:37:20.914 WARN bastion::child: Child(6593cc53-f6af-4314-a428-434aed05c5c2): Panicked.
Aug 30 19:37:20.914 WARN bastion::child: Child(390a9e42-7096-4fbf-825a-bb913b546155): Panicked.
Aug 30 19:37:20.915 WARN bastion::dispatcher: sending message to child 3/3 - /36a9c73a-169d-4a4c-a355-38e73e0a79de/465c387f-fb6c-4937-88fe-4ae21a4410cf/1860640f-1693-4868-be84-6c7c3c7b27d2
[Processing] Worker #BastionId(1860640f-1693-4868-be84-6c7c3c7b27d2) received `B C C`
[Processing] Worker process #BastionId(1860640f-1693-4868-be84-6c7c3c7b27d2) processed data. Result: `{"C": 2, "B": 1}`
Aug 30 19:37:20.915 INFO broadcast_message: [Response] Worker started!
Aug 30 19:37:20.916 INFO broadcast_message: [Processing] Worker started!
Aug 30 19:37:20.916 WARN bastion::dispatcher: sending message to child 1/1 - /cc1c6744-e8ee-48f3-b7ac-ef18c688c07e/f46a9904-bdcb-4e52-b15a-45c6cabb2983/390a9e42-7096-4fbf-825a-bb913b546155
^C
or this:
Aug 30 19:38:45.240 INFO bastion::system: System: Initializing.
Aug 30 19:38:45.241 INFO bastion::system: System: Launched.
Aug 30 19:38:45.242 INFO broadcast_message: [Response] Worker started!
Aug 30 19:38:45.243 INFO broadcast_message: [Processing] Worker started!
Aug 30 19:38:45.243 INFO broadcast_message: [Processing] Worker started!
Aug 30 19:38:45.243 INFO broadcast_message: [Input] Worker started!
Aug 30 19:38:45.243 INFO broadcast_message: [Processing] Worker started!
Aug 30 19:38:45.243 INFO bastion::system: System: Starting.
Aug 30 19:38:45.243 WARN bastion::dispatcher: sending message to child 1/3 - /7d2e9f58-d9b3-4230-bd67-113d03125c7b/529db7a5-b381-4f44-b7c6-b41fa49788d4/07238cf0-43e8-49c3-84bc-686dba366bb0
Aug 30 19:38:45.243 INFO bastion::system: System: Launching Supervisor(00000000-0000-0000-0000-000000000000).
[Processing] Worker #BastionId(07238cf0-43e8-49c3-84bc-686dba366bb0) received `A B C`
[Processing] Worker process #BastionId(07238cf0-43e8-49c3-84bc-686dba366bb0) processed data. Result: `{"B": 1, "A": 1, "C": 1}`
Aug 30 19:38:45.243 INFO bastion::system: System: Launching Supervisor(599b31c6-a4a1-4503-b62c-930c0b2f0699).
Aug 30 19:38:45.244 INFO bastion::system: System: Launching Supervisor(7d2e9f58-d9b3-4230-bd67-113d03125c7b).
Aug 30 19:38:45.244 INFO bastion::system: System: Launching Supervisor(14ced271-b562-4d2b-9ff9-5a7a7889599f).
Aug 30 19:38:45.244 WARN bastion::dispatcher: sending message to child 2/3 - /7d2e9f58-d9b3-4230-bd67-113d03125c7b/529db7a5-b381-4f44-b7c6-b41fa49788d4/a2bbd0b9-62c4-48b0-8020-ff1b7b484bc4
Aug 30 19:38:45.244 WARN bastion::dispatcher: sending message to child 1/1 - /599b31c6-a4a1-4503-b62c-930c0b2f0699/d4fa4651-745b-45d5-9ff5-4f9cbc868351/52ea34e4-9eab-4fb4-bb37-42fb8e99e4f8
thread 'bastion-async-thread' panicked at 'called `Result::unwrap()` on an `Err` value: SignedMessage { msg: Msg(Broadcast(Any)), sign: RefAddr { path: /supervisor#14ced271-b562-4d2b-9ff9-5a7a7889599f/children#3fecc1d4-64e9-4c3d-99d4-d2f845db626b/child#9c3cc4e6-c19e-42b8-8d2d-0434f3659302, sender: UnboundedSender(Some(UnboundedSenderInner { inner: UnboundedInner { state: 9223372036854775808, message_queue: Queue { head: 0x56467b25ee80, tail: UnsafeCell }, num_senders: 13, recv_task: AtomicWaker } })) } }', src/main.rs:96:64
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'bastion-async-thread' panicked at 'called `Result::unwrap()` on an `Err` value: SignedMessage { msg: Msg(Broadcast(Any)), sign: RefAddr { path: /supervisor#7d2e9f58-d9b3-4230-bd67-113d03125c7b/children#529db7a5-b381-4f44-b7c6-b41fa49788d4/child#07238cf0-43e8-49c3-84bc-686dba366bb0, sender: UnboundedSender(Some(UnboundedSenderInner { inner: UnboundedInner { state: 9223372036854775808, message_queue: Queue { head: 0x7f1ac8001460, tail: UnsafeCell }, num_senders: 18, recv_task: AtomicWaker } })) } }', src/main.rs:149:72
Aug 30 19:38:45.246 WARN bastion::child: Child(a2bbd0b9-62c4-48b0-8020-ff1b7b484bc4): Panicked.
Aug 30 19:38:45.246 WARN bastion::child: Child(52ea34e4-9eab-4fb4-bb37-42fb8e99e4f8): Panicked.
Aug 30 19:38:45.246 WARN bastion::dispatcher: sending message to child 3/3 - /7d2e9f58-d9b3-4230-bd67-113d03125c7b/529db7a5-b381-4f44-b7c6-b41fa49788d4/474cd68a-f440-4386-9f4b-4ae65037561c
[Processing] Worker #BastionId(474cd68a-f440-4386-9f4b-4ae65037561c) received `B C C`
[Processing] Worker process #BastionId(474cd68a-f440-4386-9f4b-4ae65037561c) processed data. Result: `{"C": 2, "B": 1}`
Aug 30 19:38:45.247 INFO broadcast_message: [Response] Worker started!
Aug 30 19:38:45.249 INFO broadcast_message: [Processing] Worker started!
Thanks for your reply, I'm glad to see it improved.
I hope the last issues are resolved by #261 , but merging it will take a bit of time, Mind if I ping you once it's done ? :)
No, please go ahead. Thank you :)
Ok uhm I totally forgot to ping you here I'm sorry, #261 has been merged a while ago and i think / hope we're good ^^'
I m going to merge the pull request and close this issue, feel free to reopen it if it still is relevant ^^
Hello,
the broadcast_message example deadlocks with warnings about "bastion::dispatcher: The message can't be delivered to the group with the 'Processing' name.". Workers are started after the warnings.
When I change the supervisor creation order to: response_supervisor, map_supervisor, input_supervisor results vary but sometimes some messages get processed by a response_group child. Sometimes a bastion-async-thread thread panics with:
And the program deadlocks and has to be interrupted.
A similar problem occurs in my code. Some messages get processed but not all. Depending on the amount of children which process messages and the amountt of messages inserted, all, some or none get completly processed. It seems like broadcasting a message occupies two children sometimes.
Versions: name = "bastion" version = "0.4.2" name = "bastion-executor" version = "0.3.6" name = "lightproc" version = "0.3.5"
Platform: Linux linux-i9e5 5.8.2-1-default #1 SMP Wed Aug 19 09:43:15 UTC 2020 (71b519a) x86_64 x86_64 x86_64 GNU/Linux
Code: https://github.com/bastion-rs/bastion/blob/master/src/bastion/examples/broadcast_message.rs