MolotovCherry / virtual-display-rs

A Windows virtual display driver to add multiple virtual monitors to your PC! For Win10+. Works with VR, obs, streaming software, etc
MIT License
473 stars 39 forks source link

Refactor IPC API implementation #116

Closed Dampfwalze closed 1 week ago

Dampfwalze commented 2 months ago

This is my go on the IPC API.

The driver events are now expressed by a Stream. This also allowes for multiple listeners.

Using the sync API, one can now add many receiver callbacks. The termination is now expressed using a boolean return value.

driver.add_event_receiver(|cmd| {
  ...
  if finished {
    false
  } else { 
    true 
  };
});

Maybe a subscription object is better?

let subscription = driver.add_event_receiver(|cmd| { ... });
...
subscription.cancel();

(Changes are not fully tested)

MolotovCherry commented 2 months ago

This was the last remaining thing on my todo list to handle properly. This is actually the kind of implementation I was looking for, but didn't get around to yet.

I would prefer a subscription object instead, but yeah, this is overall looking quite good!

Dampfwalze commented 2 months ago

I was thinking about the possibility to easily revert the drivers state to the persistant state. There could be a method Client::revert() or Client::revert_to_persistant() that will load back the saved state from the registry.

MolotovCherry commented 2 months ago

I was thinking about the possibility to easily revert the drivers state to the persistant state. There could be a method Client::revert() or Client::revert_to_persistant() that will load back the saved state from the registry.

That's a good suggestion.

What do you think about having the method fetch and convert the persistent state to Vec<Monitor> and let the caller do what they want with it? We could keep the one that also reverts the driver in one call as well if it's still desirable

Dampfwalze commented 2 months ago

What do you think about having the method fetch and convert the persistent state to Vec<Monitor> and let the caller do what they want with it? We could keep the one that also reverts the driver in one call as well if it's still desirable

In any case, there could be a method on the Client that deserializes the persistend state: Client::read_persistent() -> Vec<Monitor> (also forwarded in DriverClient?).

Then, there can be a method on the DriverClient to read this state and write it in its own state: DriverClient::load_persiststent().

But Client::revert_to_persistant() is still good to have (also forwarded in DriverClient).

MolotovCherry commented 2 months ago

In any case, there could be a method on the Client that deserializes the persistend state: Client::read_persistent() -> Vec<Monitor> (also forwarded in DriverClient?).

Then, there can be a method on the DriverClient to read this state and write it in its own state: DriverClient::load_persiststent().

But Client::revert_to_persistant() is still good to have (also forwarded in DriverClient).

That sounds perfect. I'll add that after this pr is merged

Dampfwalze commented 2 months ago

I would prefer a subscription object instead

I have implemented this now.

One thing to note, there is no good way to cancel the subscription from the callback itself now. Doing this will involve creating some shared state where the subscription is moved into after subscribing, so something like Arc<Mutex<Option<EventsSubscription>>>.

Example:

let shared_sub = Arc::new(Mutex::new(None::<EventsSubscription>));

let sub = client.add_event_receiver({
    let shared_sub = shared_sub.clone();
    move |event| {
        println!("{:?}", event);
        // ...
        shared_sub.lock().unwrap().as_mut().unwrap().cancel();
    }
});

*shared_sub.lock().unwrap() = Some(sub);

Also: What should happen if the callback panics? Currently the task would just stop without notice. Maybe EventSubscription::cancel() could return a Result? What would the Error type be then?

MolotovCherry commented 2 months ago

I would prefer a subscription object instead

I have implemented this now.

One thing to note, there is no good way to cancel the subscription from the callback itself now. Doing this will involve creating some shared state where the subscription is moved into after subscribing, so something like Arc<Mutex<Option<EventsSubscription>>>.

Example:

let shared_sub = Arc::new(Mutex::new(None::<EventsSubscription>));

let sub = client.add_event_receiver({
    let shared_sub = shared_sub.clone();
    move |event| {
        println!("{:?}", event);
        // ...
        shared_sub.lock().unwrap().as_mut().unwrap().cancel();
    }
});

*shared_sub.lock().unwrap() = Some(sub);

Also: What should happen if the callback panics? Currently the task would just stop without notice. Maybe EventSubscription::cancel() could return a Result? What would the Error type be then?

I don't feel it matters much here if the callback panics. It does print a message, and the task exiting on panic is expected (similar to what would happen if a thread panicked). Though a note could be added if there's anything we want the user to be clear about. Do you feel it would be useful to catch unwind, mark the panic, resume unwind, and notify on cancel?

Perhaps it might be better to just catch it and re-raise it on the main thread instead. This way it becomes a lot clearer since everything crashes, instead of silently failing.

I just did some testing and found that the cb isn't getting executed. The streams are receiving as expected, it seems to stop working at RUNTIME.spawn in EventsSubscription. A block_on does execute it all the way (just a test to see if that was the issue), but spawn seems to not

Dampfwalze commented 2 months ago

I just did some testing and found that the cb isn't getting executed. The streams are receiving as expected, it seems to stop working at RUNTIME.spawn in EventsSubscription. A block_on does execute it all the way (just a test to see if that was the issue), but spawn seems to not

This is wired, because all unit tests pass and they do check if the callback is run or not. I need a bit more context to make sense of the situation here. Maybe the runtime was blocked somehow in your tests? Did you call the API from within the runtime? RUNTIME.spawn() does not immediately start execution, it just sends it of to be scheduled for execution.

You can try tokio::task::yield_now().await after calling add_event_receiver() to give execution back to tokio.

MolotovCherry commented 2 months ago

This is wired, because all unit tests pass and they do check if the callback is run or not. I need a bit more context to make sense of the situation here. Maybe the runtime was blocked somehow in your tests? Did you call the API from within the runtime? RUNTIME.spawn() does not immediately start execution, it just sends it of to be scheduled for execution.

You can try tokio::task::yield_now().await after calling add_event_receiver() to give execution back to tokio.

So far from what I've been able to gather, the test event_receiver_cancel_from_cb() still passes after a panic is introduced into the callback. The first one (event_receiver()) properly fails when doing the same thing, so it applies only to this test. Hmm

let sub = client.add_event_receiver({
      let shared_sub = shared_sub.clone();
      move |event| {
          panic!();
      }
  });
Dampfwalze commented 2 months ago

So far from what I've been able to gather, the test event_receiver_cancel_from_cb() still passes after a panic is introduced into the callback. The first one (event_receiver()) properly fails when doing the same thing, so it applies only to this test. Hmm

Yes, this is correct. This makes sense, because when the callback panics, the receiver used to cancel the callback is dropped and closed. If you now call cancel(), it returns false, which is actually the expected behaviour, because the callback was already "canceled".

But when introducing a print statement, I can confirm that it is run.

MolotovCherry commented 2 months ago

Give me a moment to track this original problem down. I'll reply back when I have more context.

On a side note though, I do agree that it might be best to handle panics somehow to make it more transparent. You asked earlier:

Maybe EventSubscription::cancel() could return a Result? What would the Error type be then?

We could just use std::thread::Result, which is the same type that catch unwind returns. I'm leaning more into not panicking the whole program, so cancel seems like as good a place as any to return it I guess. It shouldn't be too much of a requirement that the cb is UnwindSafe, right? If there's a nicer way, I might be more preferable to that. I'd still like to avoid a catch unwind if possible.

MolotovCherry commented 2 months ago

If you'd like to test what I was talking about earlier, here's how (below). (Please see Edit 2)

Edit: Since the before steps were a bit roundabout, use this instead. This triggers it just the same. (The async version works fine)

use driver_ipc::sync::DriverClient;

fn main() {
    let driver = DriverClient::new().unwrap();
    driver.add_event_receiver(|event| {
        // this does not print
        println!("{event:?}");
    });

    // go and send some events on the driver
    std::thread::sleep(std::time::Duration::from_secs(10));
    println!("shutting down");
}

Edit 2: Okay I think we got it now. Not exactly surprising (obviously, give the behavior of receivers), but we need to add a comment explaining that the returned value needs to be saved and not dropped until done, otherwise the callback will stop firing. Not binding the value to something will also cause a drop before end of scope. Though obvious, it's an easy gotcha people may accidentally hit.

Everything looking good! 👍🏻

Dampfwalze commented 2 months ago

Edit 2: Okay I think we got it now. Not exactly surprising (obviously, give the behavior of receivers), but we need to add a comment explaining that the returned value needs to be saved and not dropped until done, otherwise the callback will stop firing. Not binding the value to something will also cause a drop before end of scope. Though obvious, it's an easy gotcha people may accidentally hit.

This is not the behavior I expected nor wanted. I tracked that issue down and fixed it. If the subscription gets dropped, the callback will still run. This behavior is in line with the behavior of thread::spawn or task::spawn. When you drop the JoinHandle, the thread/task will be detached.

MolotovCherry commented 2 months ago

The last remaining unresolved comment is basically already taken care of, isn't it?

Basically, what I'm envisioning here is that read errors out, but it's then confusing to the library user on why they aren't receiving any messages (e.g. on their event stream).

As far as this, I can take care of handling the error and a callback notification system of sorts / letting user know they got disconnected.

Maybe sometime we can handle re-connections after the server disconnected, but we don't need to think about that right now (it's not like the client can't just recreate the driver instance anyways).

This is a non-issue unrelated to this PR (not sure if it matters to bother handling this, but even if I do, it's unrelated to this PR)

The senders should be fine since the caller will receive errors if they try to send, but since we still hold onto a broadcast sender, the subscribers won't receive a None when the stream should be closed.

I'm thinking that we should completely drop the broadcast sender on error to solve this.

Iirc, this is already fixed after you changed the Client to hold a receiver instead.


So I think the previous one can be marked as resolved and we can merge this if you feel it's ready. (If you still have some api input about the design of it, your thoughts are welcome, but I don't see this last one as a blocker)

MolotovCherry commented 1 week ago

Lgtm! If you feel it's good to go, I'm fine merging it right now as-is. If you want to do any touch-ups, it can be done later in smaller PR's (this would make it much easier to keep track of things too).

MolotovCherry commented 1 week ago

Merged now. Thank you for the time you put into this contribution! <3