alexcrichton / curl-rust

Rust bindings to libcurl
MIT License
1.02k stars 233 forks source link

Expose the curl_multi_poll & curl_multi_wakeup APIs #403

Closed darkprokoba closed 2 years ago

darkprokoba commented 3 years ago

Currently, the curl crate uses internally the older curl_multi_wait API for classic poll(2)-based async clients.

It would be quite useful, if we could use the newer curl_multi_poll API instead (or let the user decide).

I have an application that does a lot of HTTP PUTs where the source that produces the data gets quite slow occasionally. When curl asks for data, but we don't have anything to send yet, my read_function returns Err(ReadError::Pause) and at some point in the future, I can resume the transfer with unpause_read(), however, there could be up to a second delay between the moment more data is available and the moment the transfer is unpaused. IIRC, curl_multi_poll/wakeup solve this problem precisely while having the extra benefit that curl_multi_wakeup can be invoked in any thread (nice!).

Are there any plans to expose these functions? If I attempted to do the work myself, what additional steps do I need to take in terms of documentation, unit tests, etc?

sagebind commented 3 years ago

Thanks for filing an issue! Firstly I'd like to point out that the goal of the curl crate is to expose APIs that match the original curl APIs as closely as possible while enforcing complete safety. So swapping what curl function is used under the hood without notice seems out of the question. It is up to the user to decide how exactly they want to use curl. Multi::wait is a pretty direct call to curl_multi_wait, so if you want to use curl_multi_poll then it would need to be exposed as a new method on the Multi struct.

If exposed, I would expect curl_multi_poll to be exposed as a Multi::poll method. Exposing curl_multi_wakeup is a lot tricker, since it has different thread safety requirements as the rest of the curl_multi_* methods. Based on the docs and source code, it seems that this can be called in a Send and Sync context, so maybe we would introduce a new MultiWakeup struct which can easily be acquired from a Multi struct that implements both Send and Sync and has a wakeup(&self) -> Result<(), MultiError> method.

The hard part is controlling lifetime of such a handle -- if the Mutli struct is dropped we'll want to make sure to invalidate all MultiWakeup handles, either at runtime or statically, since we don't want to pass a dangling pointer.

Finally, the remaining challenge is that this API was recently introduced as of version 7.68.0, but by default the curl crate will link to the system-installed version of curl which may not have symbols for these methods. We'd want to feature-gate these new APIs behind a crate feature or something.

darkprokoba commented 3 years ago

if you want to use curl_multi_poll then it would need to be exposed as a new method on the Multi struct.

Sure.

so maybe we would introduce a new MultiWakeup

Hadn't thought of that. Not sure my Rust skills are up to the task, but I'm going to take a crack at this when I get the chance...

We'd want to feature-gate these new APIs behind a crate feature or something.

Makes sense.

sagebind commented 3 years ago

Not sure my Rust skills are up to the task, but I'm going to take a crack at this when I get the chance...

Awesome, we love contributions! If you need any help don't hesitate to ask. Starting out with a draft PR is also a good way to solicit early feedback.

If you like, I do have a couple of ideas on how this might be done if you need inspiration:

pub struct MultiWaker {
    raw: *mut curl_sys::CURLM,
    dropped: Weak<()>, // cheap way to check if the linked Multi was dropped
}

impl MultiWaker {
    pub fn wakeup(&self) -> Result<(), MultiError> {
         // check `dropped` to make sure that the CURLM wasn't freed
         // forward to curl_multi_wakeup, must also be added to curl-sys
    }
}

// Only this struct is OK to share with multiple threads
unsafe impl Send for MultiWaker {}
unsafe impl Sync for MultiWaker {}

impl Clone for MultiWaker {
    // might be nice to impl Clone for easier sending to other threads
}

pub struct Multi {
    /* ... */
    // Add extra field so that all MultiWakers can track the dropped state of this
    dropped: Arc<()>,
}

impl Multi {
    /* ... */
    pub fn waker(&self) -> MultiWaker {
        // make a waker handle
    }
}

There's some things I don't like about this design, namely that the Multi struct would make an extra allocation even if the user never uses wakeup, and that wakeup has to return some sort of synthetic error if the Multi was dropped. Might be good to look into alternative solutions as well.

darkprokoba commented 2 years ago

hi @sagebind,

a question about the MultiWaker::wakeup's thread safety.

let's consider the implementation of this method:

impl MultiWaker {
    pub fn wakeup(&self) {
        // let's say this is invoked in thread A
        if self.dropped.upgrade().is_some() {
            unsafe {
                // at this point the thread A can be preempted
                // and another thread can be scheduled that destroys the multi
                // then thread A is scheduled back again and executes this on an invalid handle:
                cvt(curl_sys::curl_multi_wakeup(self.raw))
            }
        }
    }
}

In other words, the proposed synchronization scheme using an Arc in Multi and a Weak reference to it in MultiWaker, minimizes the window for the race, but does not eliminate it completely.

Should I be worried about this?

darkprokoba commented 2 years ago

in fact, it seems to me, that some form of mutex is unavoidable here... (std::sync::Mutex or parking_lot::Mutex?)

i.e.

pub Struct Multi {
...
    #[cfg(feature=poll_7_68_0)]
    is_alive: Arc<Mutex<bool>>, //initialized in the constructor to Arc::new(Mutex::new(true)),
}

impl Multi {
    unsafe fn close_impl(&self) -> Result<(), MultiError> {
        let mut is_alive = self.is_alive.lock();
        cvt(curl_sys::curl_multi_cleanup(self.raw))
        is_alive = false;
    }
}

impl MultiWaker {
    pub fn wakeup(&self) {
        let is_alive = self.is_alive.lock();
        if is_alive {
            cvt(curl_sys::curl_multi_wakeup(self.raw))
        } else {
            SyntheticError
        }
    }
}

what do you think?

sagebind commented 2 years ago

The Arc method I think should be safe so long as any one of the handles holding the multi handle calls curl_multi_cleanup on drop. In other words, if a Multi is dropped while one or more MultiWakers are still alive, then curl_multi_cleanup will not be called until all MultiWakers are also dropped.

Something perhaps like this:

struct MultiCleanup {
    raw: *mut CURLM,
}

impl Drop for MultiCleanup {
    fn drop(&mut self) {
        unsafe {
            cvt(curl_sys::curl_multi_cleanup(self.raw));
        }
    }
}

struct Multi {
    cleanup: Arc<MultiCleanup>,
    raw: *mut CURLM, // still keep a pointer here because we don't want to pay the cost
    // of dereferencing the Arc every time we make a curl_multi call.
}

struct MultiWaker {
    cleanup: Weak<MultiCleanup>,
    raw: *mut CURLM,
}

impl MultiWaker {
    pub fn wakeup(&self)  -> Result<(), MultiError> {
        if let Some(cleanup) = self.cleanup.upgrade() {
           // we know self.raw is valid as long as `cleanup` is valid since at least 1 handle exists
           // to MultiCleanup and cleanup hasn't been called yet
           cvt(curl_sys::curl_multi_wakeup(self.raw))

           // curl_multi_cleanup cannot be called until `cleanup` is dropped, at which point it will
           // be called by this thread if this is the last remaining handle
    } else {
        SyntheticError
    }
}
darkprokoba commented 2 years ago

I see now what you mean and why it could work :-) Two questions:

  1. the curl multi interface is not thread safe, so i've no idea if invoking curl_sys::curl_multi_cleanup from a random thread is ok?
  2. i'm not sure the native handle outliving the Multi struct is really what we want?
sagebind commented 2 years ago
  1. Good question, I was wondering myself if this might be an issue. My understanding based on https://curl.se/libcurl/c/threadsafe.html is that it should be safe, since when a random thread does call curl_multi_cleanup the multi handle will not be in use by any other thread but that one. So we're treating it as if it were Send, not necessarily Sync. That said, the Multi struct is !Send but I think that's because of how easy handles are attached to it and not because of the CURLM itself.
  2. The MultiWakers in my example only hold a Weak to the inner handle, so the handle would only outlive the Multi struct if one thread was dropping the Multi at the same time as another thread was calling wakeup, which would increase the refcount until the wakeup call was over.
alexcrichton commented 2 years ago

I left some comments in #413 before reading further here, but I think @sagebind's suggestions are all safe to use with curl. The Multi structure is probably send/sync already and just not marked so, but the main implication of that is that we're correctly using &self and &mut self in Rust.

darkprokoba commented 2 years ago

One more thing: Right now, if the feature is not enabled (which is the default), there are no new members of the Multi struct, no additional allocations, etc. Do you think I should try and preserve this behavior or is it ok if we added the cleanup: cleanup: Arc<MultiCleanup> property and related allocations regardless of whether the feature is enabled or not?

alexcrichton commented 2 years ago

I don't think that the performance of Multi is so critical we need to remove an allocation, there's probably only a small handful of Multi per-program so having an Arc should be fine.

darkprokoba commented 2 years ago

413 has now been merged.

Many thanks to @sagebind and @alexcrichton for all their help and guidance!