QuantumEntangledAndy / neolink

An RTSP bridge to Reolink IP cameras
GNU Affero General Public License v3.0
297 stars 44 forks source link

MQTT issues #55

Closed jl1990 closed 1 year ago

jl1990 commented 1 year ago

Describe the bug I am trying to use neolink with MQTT but I am getting different errors and commands are basically not working.

To Reproduce This first error is random (it seems to be thread related) ... it happens sometimes when booting the application. Steps:

  1. Run application with multiple cameras with MQTT enabled - neolink mqtt -c neolink.toml
    
    C:/Users/jlcar/.cargo/bin/cargo.exe run --color=always --package neolink --bin neolink -- mqtt -c neolink.toml
    Finished dev [unoptimized + debuginfo] target(s) in 0.19s
    warning: the following packages contain code that will be rejected by a future version of Rust: nom v6.1.2, winapi v0.2.8
    note: to see what the problems were, use the option `--future-incompat-report`, or run `cargo report future-incompatibilities --id 297`
     Running `target\debug\neolink.exe mqtt -c neolink.toml`
    [2023-04-05T18:18:10Z INFO  neolink] Neolink 0.5.6 debug
    [2023-04-05T18:18:10Z INFO  neolink::mqtt] Terraza: Setting up mqtt
    [2023-04-05T18:18:10Z INFO  neolink::mqtt] Lavadero: Setting up mqtt
    [2023-04-05T18:18:10Z INFO  neolink::mqtt] Piscina: Setting up mqtt
    [2023-04-05T18:18:10Z INFO  neolink::mqtt::mqttc] Terraza: Starting listening to mqtt
    [2023-04-05T18:18:10Z INFO  neolink::mqtt::mqttc] Piscina: Starting listening to mqtt
    [2023-04-05T18:18:10Z INFO  neolink::mqtt::mqttc] Lavadero: Starting listening to mqtt
    [2023-04-05T18:18:10Z INFO  neolink::mqtt::mqttc] Starting MQTT Client for Terraza
    [2023-04-05T18:18:10Z INFO  neolink::mqtt::mqttc] Starting MQTT Client for Piscina
    [2023-04-05T18:18:10Z INFO  neolink::mqtt::event_cam] Piscina: Connecting to camera at UID: XXXXXXXXXXXXXXX
    [2023-04-05T18:18:10Z INFO  neolink::mqtt::mqttc] Starting MQTT Client for Lavadero
    thread 'tokio-runtime-worker' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.', [2023-04-05T18:18:10Z INFO  neolink::mqtt::event_cam] Lavadero: Connecting to camera at UID: XXXXXXXXXXXXXXX
    C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\rumqttc-0.8.0\src\client.rs:344[2023-04-05T18:18:10Z INFO  neolink::mqtt::event_cam] Terraza: Connecting to camera at UID: XXXXXXXXXXXXXXX
    :28
    stack backtrace:
    0: rust_begin_unwind
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library\std\src/panicking.rs:575:5
    1: core::panicking::panic_fmt
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library\core\src/panicking.rs:64:14
    2: tokio::runtime::context::enter_runtime
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\context.rs:183:9
    3: tokio::runtime::scheduler::current_thread::CurrentThread::block_on
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\scheduler\current_thread.rs:146:25
    4: tokio::runtime::runtime::Runtime::block_on
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\runtime.rs:302:47
    5: <rumqttc::client::Iter as core::iter::traits::iterator::Iterator>::next
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\rumqttc-0.8.0\src\client.rs:344:15
    6: <core::iter::adapters::enumerate::Enumerate<I> as core::iter::traits::iterator::Iterator>::next
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0\library\core\src\iter\adapters/enumerate.rs:46:17
    7: neolink::mqtt::mqttc::Mqtt::start
             at .\src\mqtt\mqttc.rs:215:39
    8: neolink::mqtt::listen_on_camera::{{closure}}::{{closure}}
             at .\src\mqtt\mod.rs:131:17
    9: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\task\core.rs:223:17
    10: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\loom\std\unsafe_cell.rs:14:9
    11: tokio::runtime::task::core::Core<T,S>::poll
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\task\core.rs:212:13
    12: tokio::runtime::task::harness::poll_future::{{closure}}
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\task\harness.rs:476:19
    13: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0\library\core\src\panic/unwind_safe.rs:271:9
    14: std::panicking::try::do_call
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0\library\std\src/panicking.rs:483:40
    15: __rust_try
    16: std::panicking::try
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0\library\std\src/panicking.rs:447:19
    17: std::panic::catch_unwind
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0\library\std\src/panic.rs:140:14
    18: tokio::runtime::task::harness::poll_future
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\task\harness.rs:464:18
    19: tokio::runtime::task::harness::Harness<T,S>::poll_inner
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\task\harness.rs:198:27
    20: tokio::runtime::task::harness::Harness<T,S>::poll
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\task\harness.rs:152:15
    21: tokio::runtime::task::raw::poll
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\task\raw.rs:255:5
    22: tokio::runtime::task::raw::RawTask::poll
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\task\raw.rs:200:18
    23: tokio::runtime::task::LocalNotified<S>::run
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\task\mod.rs:394:9
    24: tokio::runtime::scheduler::multi_thread::worker::Context::run_task::{{closure}}
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\scheduler\multi_thread\worker.rs:464:13
    25: tokio::runtime::coop::with_budget
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\coop.rs:107:5
    26: tokio::runtime::coop::budget
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\coop.rs:73:5
    27: tokio::runtime::scheduler::multi_thread::worker::Context::run_task
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\scheduler\multi_thread\worker.rs:463:9
    28: tokio::runtime::scheduler::multi_thread::worker::Context::run
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\scheduler\multi_thread\worker.rs:433:24
    29: tokio::runtime::scheduler::multi_thread::worker::run::{{closure}}
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\scheduler\multi_thread\worker.rs:406:17
    30: tokio::macros::scoped_tls::ScopedKey<T>::set
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\macros\scoped_tls.rs:61:9
    31: tokio::runtime::scheduler::multi_thread::worker::run
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\scheduler\multi_thread\worker.rs:403:5
    32: tokio::runtime::scheduler::multi_thread::worker::Launch::launch::{{closure}}
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\scheduler\multi_thread\worker.rs:365:45
    33: <tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\blocking\task.rs:42:21
    34: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\task\core.rs:223:17
    35: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\loom\std\unsafe_cell.rs:14:9
    36: tokio::runtime::task::core::Core<T,S>::poll
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\task\core.rs:212:13
    37: tokio::runtime::task::harness::poll_future::{{closure}}
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\task\harness.rs:476:19
    38: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0\library\core\src\panic/unwind_safe.rs:271:9
    39: std::panicking::try::do_call
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0\library\std\src/panicking.rs:483:40
    40: __rust_try
    41: std::panicking::try
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0\library\std\src/panicking.rs:447:19
    42: std::panic::catch_unwind
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0\library\std\src/panic.rs:140:14
    43: tokio::runtime::task::harness::poll_future
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\task\harness.rs:464:18
    44: tokio::runtime::task::harness::Harness<T,S>::poll_inner
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\task\harness.rs:198:27
    45: tokio::runtime::task::harness::Harness<T,S>::poll
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\task\harness.rs:152:15
    46: tokio::runtime::task::raw::poll
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\task\raw.rs:255:5
    47: tokio::runtime::task::raw::RawTask::poll
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\task\raw.rs:200:18
    48: tokio::runtime::task::UnownedTask<S>::run
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\task\mod.rs:431:9
    49: tokio::runtime::blocking::pool::Task::run
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\blocking\pool.rs:159:9
    50: tokio::runtime::blocking::pool::Inner::run
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\blocking\pool.rs:513:17
    51: tokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}}
             at C:\Users\jlcar\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.27.0\src\runtime\blocking\pool.rs:471:13
    note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
    thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: PoisonError { .. }', src\mqtt\mqttc.rs:211:53
    stack backtrace:
    0: rust_begin_unwind
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library\std\src/panicking.rs:575:5
    1: core::panicking::panic_fmt
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library\core\src/panicking.rs:64:14
    2: core::result::unwrap_failed
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0/library\core\src/result.rs:1790:5
    3: core::result::Result<T,E>::unwrap
             at /rustc/9eb3afe9ebe9c7d2b84b71002d44f4a0edac95e0\library\core\src/result.rs:1112:23
    4: neolink::mqtt::mqttc::Mqtt::start
             at .\src\mqtt\mqttc.rs:211:30
    5: neolink::mqtt::mqttc::Mqtt::new::{{closure}}
             at .\src\mqtt\mqttc.rs:116:21
    note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
    [2023-04-05T18:18:13Z INFO  neolink_core::bc_protocol] Local discovery success XXXXXXXXXXXXXXX at 192.168.2.34:21280
    [2023-04-05T18:18:13Z INFO  neolink::mqtt::event_cam] Piscina: Logging in
    [2023-04-05T18:18:13Z INFO  neolink_core::bc_protocol] Local discovery success XXXXXXXXXXXXXXX at 192.168.2.31:18069
    [2023-04-05T18:18:13Z INFO  neolink::mqtt::event_cam] Lavadero: Logging in
    [2023-04-05T18:18:13Z INFO  neolink::mqtt::event_cam] Piscina: Connected and logged in
    [2023-04-05T18:18:13Z INFO  neolink::mqtt::event_cam] Piscina: Listening to Camera Motion
    [2023-04-05T18:18:13Z INFO  neolink::mqtt::event_cam] Piscina: Setting up camera actions
    [2023-04-05T18:18:13Z INFO  neolink::mqtt::event_cam] Lavadero: Connected and logged in
    [2023-04-05T18:18:13Z INFO  neolink::mqtt::event_cam] Lavadero: Listening to Camera Motion
    [2023-04-05T18:18:13Z INFO  neolink::mqtt::event_cam] Lavadero: Setting up camera actions
    [2023-04-05T18:18:14Z INFO  neolink_core::bc_protocol] Local discovery success XXXXXXXXXXXXXXX at 192.168.2.33:18158
    [2023-04-05T18:18:14Z INFO  neolink::mqtt::event_cam] Terraza: Logging in
    [2023-04-05T18:18:15Z INFO  neolink::mqtt::event_cam] Terraza: Connected and logged in
    [2023-04-05T18:18:15Z INFO  neolink::mqtt::event_cam] Terraza: Listening to Camera Motion
    [2023-04-05T18:18:15Z INFO  neolink::mqtt::event_cam] Terraza: Setting up camera actions


Also, MQTT commands are not working for me.
If application starts successfully when issuing a command looks like it's failing here:

"Err(anyhow!("Failed to send camera data over crossbeam"))" on function send_message_with_reply

**Expected behavior**
Application should start consistently and not fail randomly.

**Versions**
NVR software: None
Neolink software: master branch version
Reolink camera model and firmware: Argus 3 Pro last firmware
jl1990 commented 1 year ago

This one also happens pretty frequently when issuing a command:

[2023-04-06T11:01:36Z ERROR neolink_core::bc_protocol::connection::bcconn] Subscription error: Io(Custom { kind: Other, error: CameraTerminate })
jl1990 commented 1 year ago

The first issue seems to be related to using runmqttc synchronously with tokio framework. RunMqttc blocks threads (which is not allowed inside tokio context):

image

QuantumEntangledAndy commented 1 year ago

Yes I suspect so. Tokio has a method to move blocking calls onto their own seperate thread. Which is what I was trying to achieve but I must have gotten something wrong with it.

QuantumEntangledAndy commented 1 year ago

The camera terminate one. Is a tricky one. The camera is dropping us. Sometimes for inactivity but sometimes for reasons I'm not sure about.

QuantumEntangledAndy commented 1 year ago

Yes I suspect so. Tokio has a method to move blocking calls onto their own seperate thread. Which is what I was trying to achieve but I must have gotten something wrong with it.

I suppose I could just use a normal thread for this. I was just trying to keep things inside the Tokio framework

QuantumEntangledAndy commented 1 year ago

The camera terminate one. Is a tricky one. The camera is dropping us. Sometimes for inactivity but sometimes for reasons I'm not sure about.

This only happens on udp so if you can use tcp please do that.

jl1990 commented 1 year ago

Yes, yesterday I was trying to migrate runmqttc code to be asynchronous using tokio as specified here:

https://github.com/bytebeamio/rumqtt/blob/main/rumqttc/README.md

But It required a lot of changes and I failed as I am not very familiar with Rust yet. I might try again later if I don't find other solution.

Regarding changing from udp to tcp, I have no idea about how to change it in the code, but I will check it. Thanks for the idea

QuantumEntangledAndy commented 1 year ago

Whether or not you can use tcp/udp depends on your camera. Battery camera are udp only. If you only provide a UID in the config it's also UDP only. TCP requires a compatible camera on a know address.

jl1990 commented 1 year ago

It's UDP only for me then, all my cameras are battery powered.

I guess I will need to understand why connection is being dropped

jl1990 commented 1 year ago

It's possible the camera is dropping the connection when there is some inactivity in order to save battery. In that case, as I understand, instead of just assuming it's an unrecoverable error, it might be worth to reopen the connection when we need it.

jl1990 commented 1 year ago

I've been digging a bit more into the problems. One of the reasons why MQTT does not answer seems to be the application running out of threads because of the previous loops used for listening camera events. I am not completely sure of this, but during my testing, I commented the camera listening part (and left only MQTT listening events) and it didn't lose a single one. Looks like the infinite loops used with tokio are making the application run out of threads

QuantumEntangledAndy commented 1 year ago

I've started moving the mqtt into the async model. Let's see how it performs once moved

QuantumEntangledAndy commented 1 year ago

Work is happening in #66 to address this. First test build is here

jl1990 commented 1 year ago

Thanks a lot @QuantumEntangledAndy. Code looks better and I am not receiving the panic for blocking tokio that I had previously randomly when booting. Although I am still not able to receive pir status or execute any command yet.

[2023-04-18T16:13:07Z ERROR neolink::mqtt] Failed to get pir status

If I execute the commands without mqtt, it works well, so it must be related with the loops used for mqtt implementation or for the camera events.

QuantumEntangledAndy commented 1 year ago

Looping itself should not be an issue as long as we reach an await it will yield the thread to another Future. The issue should only happen if we fail to reach an await such as if we have some blocking code left over that's not converted into futures and awaits.

QuantumEntangledAndy commented 1 year ago

I think I found the missing await will finish my refractor and see

QuantumEntangledAndy commented 1 year ago

Seems to be all working now. Please check out this build

jl1990 commented 1 year ago

Fantastic, it seems to be working now. Cameras are responding to commands

Thanks for the help, I tried multiple times to modify the code without success (I am a rust newbie).

jl1990 commented 1 year ago

I think the issue can be closed for now as all the issues seem to be solved