khoanguyen-3fc / dh-p2p

A proof of concept implementation of RTSP over Dahua P2P protocol.
MIT License
27 stars 13 forks source link

Simple connection test #6

Closed buhaytza2005 closed 8 months ago

buhaytza2005 commented 9 months ago

Hi,

Thank you very much for your work, I've been looking for something like this for a long time.

I have a bit of a weird scenario where I'm trying to build a monitor to reflect the possibility to connect to an NVR.

use clap::Parser;
use tokio::net::UdpSocket;

use crate::dh::p2p_handshake;
mod dh;
mod process;
mod ptcp;

#[derive(Parser)]
#[command(about = "Check if a Dahua device is online via P2P.", long_about = None)]
struct Cli {
    /// Relay mode (experimental)
    #[arg(short, long)]
    relay: bool,
    /// Serial number of the camera
    serial: String,
}
#[derive(Debug)]
struct Device {
    serial: String,
    online: bool,
}
#[tokio::main]
async fn main() {
    let serials = [
        "REDACTED",
            "REDACTED",
    ];

    let mut final_result: Vec<Device> = vec![];
    for serial in serials {

        let online: bool = test_p2p(serial.to_string(), false).await;
        let device = Device {
            serial: serial.to_string(),
            online,
        };
        final_result.push(device);
    }

    println!("{:#?}", final_result);
}

async fn test_p2p(serial: String, relay_mode: bool) -> bool {
    let relay_mode = false;

    // Create a UDP socket
    let socket = UdpSocket::bind("0.0.0.0:0").await.expect("Failed to bind UDP socket");

    // Attempt to establish P2P connection
    match p2p_handshake(socket, serial, relay_mode).await {
        Ok((socket, session)) => {
            println!("Device is online and accepting connections.");

            // Close the socket when done
            drop(socket);
            return true;
        }
        Err(e) => {
            println!("Device is offline.");
            return false;
        }
    }
}

The code above seems to do the trick and I get:

[
    Device {
        serial: "REDACTED",
        online: false,
    },
    Device {
        serial: "REDACTED",
        online: true,
    },
]

but I would ideally need it to run in a scenario like:

let p2p_semaphore = Arc::new(Semaphore::new(100));                                                                                                                                                                                          let mut nvr_futures = FuturesUnordered::new();                                                                                                                                                                                              // Create a list for cameras with false status                                                                                                                                                                                              let mut tracker_details = get_data_for_tracker(&mut client).await.unwrap();                                                                                                                                                                 for site in tracker_details.iter_mut() {                                                                                                                                                                                                        if let Some(site_id) = site.site_id {                                                                                                                                                                                                           if let Some((ip, status)) = results.get(&site_id) {                                                                                                                                                                                             site.anpr_active = Some(*status);                                                                                                                                                                                                           site.ip_address = Some(ip.clone());                                                                                                                                                                                                     }                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       if let Some((ports, count)) = port_aggregation.get(&site_id) {
                site.port = Some(ports.clone());
                site.no_of_cameras = Some(*count);
            }
            if let Some(camera) = cams.iter().find(|&c| c.site_id == Some(site_id)) {
                site.anpr_marked_active = camera.marked_active.clone();
            }
            if let Some(serial) = &site.nvr_sn {
                let serial_clone = serial.clone();
                let p2p_semaphore_clone = p2p_semaphore.clone();
                let site_id = site.site_id;
                nvr_futures.push(async move {
                    let _permit = p2p_semaphore_clone.acquire().await.unwrap();
                    let online = test_p2p(serial_clone, false).await;
                    (site_id, online)
                });

            }
        }
    }
    while let Some((site_id, online)) = nvr_futures.next().await {
        if let Some(site) = tracker_details.iter_mut().find(|s| s.site_id == site_id) {
            site.nvr_accessible = Some(online);
            println!("{:#?}", site);
        }
    }

    let write_file = write_tracker_to_excel(tracker_details, ".temp_files/file_name.xlsx").await;

I have tried to make it work a bit more like a library but for some reason once the futures finish it just hangs and all of them return as closed. Is this something due to the nature of the implementation? Any ideas?

Thanks for all your work

khoanguyen-3fc commented 9 months ago

It's possible that the P2P service has an internal rate limiter.

There are two things you can try:

  1. Would using the /probe/device/{SN} endpoint suffice for your needs? It returns a 200 status code if the device is online and, iirc, a 404 status code if the device is offline.
  2. Have you tried reducing the concurrency of your requests? Does it work with a concurrency of 1? If so, you could gradually increase the concurrency until you find the limit at which it stops working. Additionally, you might only need to call common APIs like /probe/p2psrv or /online/relay once, instead of multiple times.

https://github.com/khoanguyen-3fc/dh-p2p/blob/de7baa17f445664171c8874f7dafdfa516b92f9e/src/dh.rs#L54-L61

github-actions[bot] commented 8 months ago

This issue is stale because it has been open for 30 days with no activity.

github-actions[bot] commented 8 months ago

This issue was closed because it has been inactive for 14 days since being marked as stale.