jnqnfe / pulse-binding-rust

FFI and bindings for using PulseAudio from the Rust programming language.
Apache License 2.0
67 stars 20 forks source link

Segfault occurs when calling `stream.set_write_callback(None)` inside write callback #56

Closed acheronfail closed 1 year ago

acheronfail commented 1 year ago

Now, I have no idea if this is how the API was intended to be used, but as I was figuring out how to upload samples I discovered this issue.

First, I was using @danieldg's libpulse-tokio main loop implementation, but then I created a small repository which implemented creating a stream and uploading it as a sample with both libpulse-tokio as well as the standard threaded main loop implementations, and this is definitely an issue with both.

The repository I created is here: https://github.com/acheronfail/pulse-stream-segfault/.

The Problem

When calling stream.set_write_callback(None) inside a write callback, a segfault occurs.

Expected behaviour

I expected nothing to happen. In fact, if you call stream.set_state_callback(None) inside the state callback, then no error occurs. This is why I expected nothing to happen.

Small code example

This is basically the code:

let stream = Rc::new(RefCell::new({/* code to create a stream object */}));

let stream_ref = stream.clone();
stream.borrow_mut().set_write_callback(Some(Box::new(move |len| {
  // do the actual write...

  // SEGFAULT HERE:
  stream_ref.borrow_mut().set_write_callback(None);
})));

Have a look at the repository I created here:

Questions

jnqnfe commented 1 year ago

Hi, so I've taken a look at your 'threaded' implementation and there are two big problems with it that stand out to me. One relates to what you're reporting above, and then the other relates to incorrect use of locks.

Problem 1 - Premature callback destruction

Fundamentally what you're trying to do in your report isn't supported, and it's unfortunately not easy or possible to catch at compile time or even run time given the complexity of the underlying C API. If I've not added documentation on this, then sorry about that. Please bare with me whilst I explain what's happening.

Callbacks are created as Boxed closures. This means that the context of the closure (variables and such) are situated within a chunk of dynamically allocated memory. Being 'boxed' like this separates it from the current stack frame, enabling it to be passed around, including to a different thread.

There are two different kinds of callbacks within the binding, single-use and multi-use. Single-use callbacks are only executed once and are then automatically destroyed. Multi-use naturally can be executed more than once and must live until they are no longer needed. The callback given to set_write_callback() is an example of a multi-use callback - it will be executed each time PulseAudio wants to let you know that it's okay to write more data.

To be able to destroy multi-use callbacks when they are no longer needed, and thus both avoid memory leaks and ensure clean shutdown when the application closes, the primary object to which they are associated (e.g. the stream object) must (internally) hold a reference to them upon their creation. When the object (e.g. the stream object) is destroyed, it's drop function will explicitly destroy any multi-use callbacks it holds such references to (i.e. those it "owns" and thus is responsible for destroying). Additionally, and of particular importance to you, if you replace a multi-use callback, then the original gets immediately destroyed.

So what's happening in your example is that in the middle of this multi-use callback being executed it's trying to replace itself, or rather remove itself in this instance since you're setting the callback to None, causing itself to be destroyed, or rather causing the memory allocated to its Boxed closure to be free'd. Subsequently the callback then tries to finish executing. I imagine (without having examined it in a debugger) that the seg-fault (an invalid memory access error) occurs because after the memory is free'd, what code remains to be executed within the callback involves accessing something within that just free'd memory, and that attempt to access something from within free'd memory is caught as an invalid access.

So you see the problem is fundamentally due to the immediate destruction upon replacement. Could I not defer destruction until some later time? Maybe... One thought you might have would be why not simply have the owner hold a dynamic list of callbacks and simply add the new one to the list, leaving the existing one in place, and then destroy everything in the list later when the owner is destroyed? While this is simple, and would solve your problem, it represents a memory leak.

Fixing the problem such that there's no memory leak and such that you can safely replace/remove a multi-use callback from within itself would require something like the following: the callback context would need some way to inform the callback owner that it's 'active' (being executed) such that if you use the callback-owner's method to replace the callback, this can see that the existing callback is 'active' and thus instead of immediately destroying it, it could put the reference to one side to defer destruction, then the callback context, when it has finished running the main body of the callback, would have to be able to see this state of deferred destruction placed upon itself and then destroy itself. I'm not sure if that would even be possible to implement, it would require a lot more thought.

Now alternatively we could solve your problem by simply redesigning your implementation to move the code you've placed into this callback, out of the callback, instead using the callback just as a signalling mechanism. [You may understand the following better once you've read what I have to say about problem 2 shortly]. Note how when waiting for the context to be ready there's a loop checking the state, and for efficiency when not ready it puts the thread to sleep with wait(), and then there's a context state change callback which uses signal() when the state is ready. The mainloop lock is temporarily given up whilst sleeping, letting the mainloop thread run callbacks, and the context state change callback when it calls signal() wakes up the sleeping thread. (When waking up it needs to retake the lock so must wait for the mainloop thread to finish its loop and release it).

So something like this should work for you (just a rough uncompiled sketch):

// <various setup stuff here>

let mut write_len = Rc::new(None);
let mut bytes_written = 0;
let len = <obtain sample length>;

{
    let ml_ref = Rc::clone(&main_loop);
    let write_size_ref = Rc::clone(&write_size);
    stream.borrow_mut().set_write_callback(Some(Box::new(move |len| {
        *write_size_ref = Some(len);
        unsafe { (*ml_ref.as_ptr()).signal(false); } //signal time to wake up
    })))
}

// <mainloop unlock here, with the setup stuff now complete>

loop {
    mainloop.borrow_mut().lock()

    // <write some initial data?>

    // wait until told we can write more
    while bytes_written < len && *write_size == None {
        mainloop.borrow_mut().wait(); //go to sleep until callback (or unexpected other event) wakes us up again
    }

    if *write_size == Some(0) {
       // <abort?> //out of memory, can't upload any more?
    }

    // <write some data>

    *write_size.borrow_mut() = None; //reset just in case above wakeup was not from our callback (spurious wakeup)

    mainloop.borrow_mut().unlock()
}

// <shutdown, cleanup stuff here>

(Again, this is just a rough outline, I threw it together rather quickly, so it could be buggy).

Problem 2 - Improper locking

I noticed that after setting things up, you release the mainloop lock (correct), but then you have this which seems to demonstrate a fundamental lack of understanding of the locking model:

while let Some(()) = rx.recv().await {
    pa_ctx.borrow_mut().play_sample("SAMPLE_NAME", None, None, None);
}

I have absolutely no idea how much you do or do not understand about threaded programming, so bare with me whilst I briefly jump back to basics and build up through what you need to know. Two threads interacting with the same variable at the same time presents a problem, for instance if both want to increment an integer, this breaks down into three CPU operations, (1) read the value into the CPU, (2) add 1, (3) write the new value back to memory, and if one thread does step 1 before the other completes step 3, then one of the changes is lost. Locks are the solution to this, they provide a control mechanism with which we can ensure, for instance, that one thread completes all three of these steps before the other thread can start them, avoiding things going wrong.

Traditionally a lock would be represented by a special variable sitting alongside one or more other variables that need to be protected. You would have to take great care to take and release the lock (using special lock and unlock functions with the lock variable) whenever interacting with what the lock exists to protect, but there was no way for the compiler to understand what it was protecting and thus it was easy to make mistakes. Rust tries hard to do better. The Rust Arc type for instance is a wrapper around data that you need to control threaded access to, and the compiler won't let you access the protected content without going through the lock mechanism. As nice as that is, it's not always possible to rely on this, sometimes, like with this binding sitting upon a complex C API, you're forced to go back to the old way of having a lock variable (hidden within the mainloop object in this case) sitting alongside everything that needs to be protected (e.g. interactions with the context object or stream object(s)), you need to remember to explicitly call the lock/unlock functions whenever you want to interact with these objects to control threaded access to them, and the compiler isn't going to catch mistakes.

Another crucial detail to understand, specific to this binding, is the following: You have your 'main' thread running your main() function, and then you have a mainloop thread running the 'pulseaudio client mainloop', created when creating and setting up your mainloop object. This 'mainloop' thread interacts with the pulseaudio server, and it executes any callbacks you created and set (when appropriate). It is important to understand that behind the scenes this thread is running an infinite loop, processing server communications and assessing what callbacks to execute if any, and it will take and hold the 'mainloop' lock for the duration of each loop, with the purpose of blocking other threads, like your 'main' thread, from interacting with any objects it may need to interact with itself, either directly by its loop or by the callbacks of yours that it runs. So, firstly, since callbacks are always run within a thread that will already be holding the lock, you have no need to, and in fact must never, attempt to lock/unlock within a callback. Secondly, in your other thread ('main' thread), whenever you want to interact with any objects that might be interacted with by the mainloop thread (again, including stuff touched by callbacks), you must hold the lock when doing so to block the mainloop thread. (Excluding setup code run before the mainloop thread is created and started). Both threads must operate the lock for it to be effective. Furthermore your main thread must not hog the lock, it must cycle between holding it and not holding it in order to give the mainloop thread opportunity to hold it some of the time and thus do some work on server communication and callback execution.

Finally, the reason why we can use Rc for our context object, our stream object, and even the write_size variable in the example code above, rather than Arc, is because everything is being, or should be being, protected by the mainloop lock. Using Arc would add additional locks that we don't need.

So, what you should have written for the above was:

while let Some(()) = rx.recv().await {
    mainloop.borrow_mut().lock(); //wait for the mainloop thread to finish its loop and then block it
    pa_ctx.borrow_mut().play_sample("SAMPLE_NAME", None, None, None); //now safely interact with `pa_ctx`
    mainloop.borrow_mut().unlock(); //we're done, so stop blocking the mainloop thread
}
acheronfail commented 1 year ago

Wow, I did not expect such a detailed response - thank you so much @jnqnfe, I really appreciate the effort!

So what's happening in your example is that in the middle of this multi-use callback being executed it's trying to replace itself, or rather remove itself in this instance since you're setting the callback to None, causing itself to be destroyed, or rather causing the memory allocated to its Boxed closure to be free'd. Subsequently the callback then tries to finish executing. I imagine (without having examined it in a debugger) that the seg-fault (an invalid memory access error) occurs because after the memory is free'd, what code remains to be executed within the callback involves accessing something within that just free'd memory, and that attempt to access something from within free'd memory is caught as an invalid access.

This makes perfect sense to me!

I was initially writing it this way - performing the writes inside the write callback, and removing the callback from within the write callback - because the pulseaudio documentation was quite sparse regarding creating samples, and I searched online for some other implementations. One I found was this C++ one, where the write callback is set here, and then it's cleaned up from within here.

After reading your explanation, I believe that in the C++ example it works fine, because it's not using a boxed closure, but rather a static function so the code can continue executing even though the pointer that the stream had for its write callback has been removed.

I think this mainly stemmed from me not fully understanding how to create a stream and upload it as a sample, and so I looked to other implementations and copied them without thinking too deeply about what was happening under the surface.

I have absolutely no idea how much you do or do not understand about threaded programming, so bare with me whilst I briefly jump back to basics and build up through what you need to know.

Again, I really appreciate the explanations and detail here, even if I'd understood it before, saying it again and explaining it only helps me and any future readers!

I was roughly aware of how the threaded implementations worked after I'd been reading pulseaudio's documentation. To be honest though, it's the first time I've used an API like this, and there were gaps in my understanding (mostly filled, thanks to you!).

In writing the sample repository with the 'threaded' main loop implementation, I was just trying to get it to replicate the same segfault I saw while using the tokio main loop, so once I got to that point I stopped caring about locking and unlocking properly. :sweat_smile: All that said - your explanations here have really solidified my understanding, and I'm very grateful for that!

jnqnfe commented 1 year ago

No problem. :)