windy1 / zeroconf-rs

zeroconf is a cross-platform library that wraps underlying ZeroConf/mDNS implementations such as Bonjour or Avahi, providing an easy and idiomatic way to both register and browse services.
MIT License
77 stars 25 forks source link

Async support #1

Closed rneswold closed 3 months ago

rneswold commented 3 years ago

It'd be nice to have async or tokio support. tokio 0.3 is out, but many, many projects are still on 0.2.x so that should probably be the first target.

windy1 commented 3 years ago

Thanks for the issue. This was brought up on reddit as well and I think it's a good idea. Could you elaborate a bit more how you would like to see async/tokio integrated? I presume around the EventLoop? In the application I made, which was the motivation for this, I'm running a MdnsService and MdnsBrowser in separate tokio threads.

rneswold commented 3 years ago

I admit I'm not familiar with all the details of MDNS, so these comments may be naive.

For listening for services, the module could start a background task and return the receive handle of a broadcast channel (https://docs.rs/tokio/0.2.23/tokio/sync/broadcast/index.html). The handle will receive every incoming MDNS announcement. If multiple tasks are interested, the app writer can use .clone() so that every client will see every MDNS announcement.

When all broadcast handles are dropped, the background task's 'send' handle will resolve with an error and it can exit. Something like:

async fn task(ch: zeroconf::ReportChannel) -> () {
    loop {
        let report = ch.recv().await?;

        // Handle the incoming report.
    }
}

#[tokio::main]
async fn main() -> io::Result<()> {
    let (zc_task, zc_report) = zeroconf::create().await?;

    // zc_report is a tokio::sync::broadcast::Receiver<zeroconf::ServiceDiscovery>

    task(zc_report).await;
    zc_task.await;
}

For announcing services, your library could wrap the tokio TCP and UDP streams with an API that allows a caller to also specify the protocol string. Upon creation of your sockets, they also get registered with a background task that periodically announces the service. When the sockets are dropped, they get unregistered, too.

#[tokio::main]
async fn main() -> io::Result<()> {
    let (zc_task, zc_report) = zeroconf::create().await?;

    let mut listener = zc_task.tcp_listener("127.0.0.1", 8080, "_http._tcp").await?;

    // During this loop, the background task, zc_task, will periodically announce the service.
    // When 'listener' is dropped, zc_task stops announcing it.

    loop {
        let (socket, _) = listener.accept().await?;
        process_socket(socket).await;
    }
}

Of course, this is Rust, so those examples will never compile during the first try and you'll spend 3+ hours working out lifetime issues and whether ownership transfer is allowed, etc. 😄 So consider it a back-of-the-napkin design that needs much more thought and polish.

Johannesd3 commented 3 years ago

FWIW, there is async-dnssd which supports async/await, but unfortunately only uses bonjour.

nsabovic commented 3 years ago

Here's the workaround that I'm using to register a service right now. Since BonjourMdnsService and BonjourEventLoop are not marked as Send, they can't even be used across awaits so I had to stick them in a struct that I can mark Send and use ouroboros for self-referencing structure. Yuck.

    let zeroconf_loop = tokio::spawn(async move {
        // A trick to make BonjourMdnsService and BonjourEventLoop Send.
        #[self_referencing]
        struct Holder {
            service: BonjourMdnsService,
            #[borrows(mut service)]
            #[covariant]
            event_loop: BonjourEventLoop<'this>,
        }
        unsafe impl Send for Holder {}

        let h = Holder::new(
            MdnsService::new(ServiceType::new("service_name", "tcp").unwrap(), 12345),
            |service| {
                service.set_registered_callback(Box::new(|result, _| {
                    info!("Service registered on: {}", result.unwrap().name());
                }));
                service.register().unwrap()
            }
        );

        let mut interval = interval(Duration::from_millis(100));
        loop {
            h.with_event_loop(|event_loop| {
                event_loop.poll(Duration::from_secs(0)).unwrap();
            });

            interval.tick().await;
        }
    });
windy1 commented 3 years ago

Thank you for your support on this project, I have been working on implementing async in my free time (which is limited): #9

Here's an example of what this will look like:

use zeroconf::prelude::*;
use zeroconf::{MdnsBrowser, ServiceType};

#[tokio::main]
pub async fn main() -> zeroconf::Result<()> {
    let mut browser = MdnsBrowser::new(ServiceType::new("http", "tcp")?);
    loop {
        let result = browser.browse_async().await;
        println!("Service discovered: {:?}", result.unwrap());
    }
}
use std::thread;
use std::time::Duration;
use zeroconf::prelude::*;
use zeroconf::{MdnsService, ServiceType, TxtRecord};

#[tokio::main]
pub async fn main() -> zeroconf::Result<()> {
    let mut service = MdnsService::new(ServiceType::new("http", "tcp")?, 8080);
    let mut txt_record = TxtRecord::new();

    txt_record.insert("hello", "world")?;
    service.set_txt_record(txt_record);

    let result = service.register_async().await;
    println!("Service: {:?}", result);

    loop {
        // do stuff
        thread::sleep(Duration::from_nanos(1));
    }
}

This MR is working on Linux but still needs a little work on the Bonjour side of things.

windy1 commented 3 months ago

This is now supported through a new crate I've developed: zeroconf-tokio