rust-lang / futures-rs

Zero-cost asynchronous programming in Rust
https://rust-lang.github.io/futures-rs/
Apache License 2.0
5.41k stars 626 forks source link

How do i use forget() now #121

Closed ustulation closed 8 years ago

ustulation commented 8 years ago

Hey guys, i was using forget() which in master is removed. I'll go through the commit history to find how is it intended to use now, but thought of asking here too in case it has a quick answer:

Previously:

fn run() {
    std::thread::spawn(move || {
        for it in mpsc_rx.recv() {
            let tx = queue.remove(it.uuid);
            tx.complete(SomeType);
        }
    }).unwrap();
}

fn ffi(cb: extern "C" foo(*const u8, u32)) {
    let (tx, rx) = futures::oneshot();
    queue.insert(some_uuid, tx);
    rx.map(move |result| {
        let (data, size) = transform(result);
        cb(data, size);
    }).forget();
}

How am i supposed to correctly transform this ^^^ using master (it works with the one in crates.io) ?

Thanks !

alexcrichton commented 8 years ago

Thanks for the report! The forget function was removed in favor of a clearer picture about where futures are actually executed. I think here what you'll want to do is to have a CpuPool in the background, and you'll want to spawn futures into it.

The problem with forget before was that it'd just run code on whatever thread completed it, which while fine for some use cases can be quite dangerous for others. The recent executor changes made it much clearer about where futures are being executed.

alexcrichton commented 8 years ago

Let me know if that doesn't help though!

ustulation commented 8 years ago

Thanks for the prompt reply ! I have only just started working with this crate so my lack of understanding might be frustrating. Is CpuPool is a threadpool kind of stuff ? Anyway, i thought a simple example is worth a million explanatory words, so i better state my problem using one instead of potentially confusing myself with my own explanations.

Here is a simple blocking call flow:

// Caller has to spawn n threads to make n simultaneous calls to f0()

fn f0() -> Type0 { // blocking call
    let type_1 = f1(); // blocking call
    let type_2 = f2(); // blocking call
    let type_3 = f3(); // blocking call

    let type_0 = transform(type_1, type_2, type_3);

    type_0
}

fn f1() -> Type1 {
    let network_result_type = network_call(); // blocking call
    let type_1 = transform(network_result_type);
    type_1
}

fn f2() -> Type2 {
    let network_result_type = network_call(); // blocking call
    let type_2 = transform(network_result_type);
    type_2
}

fn f3() -> Type3 {
    let network_result_type = network_call(); // blocking call
    let type_3 = transform(network_result_type);
    type_3
}

I want to convert this to async pattern - and i don't want a thread-pool or worse a thread-per-call. So natural progression was to use a single threaded event-loop and callback pattern. The above changes to:

// Caller calls f0() as many times without a thread-per-call for simultaneous access.

fn f0<F: FnOnce(Type0)>(cb: F) { // async call
    f1(move |type_1| {
        f2(move |type_2| {
            f3(move |type_3| {
                cb(transform(type_1, type_2, type_3));
            }
        }
    }
}

fn f1<F: FnOnce(Type1)>(cb: F) {
    network_call(move |network_result_type| { // async call
        cb(transform(network_result_type));
    });
}

fn f2<F: FnOnce(Type2)>(cb: F) {
    network_call(move |network_result_type| { // async call
        cb(transform(network_result_type));
    });
}

fn f3<F: FnOnce(Type3)>(cb: F) {
    network_call(move |network_result_type| { // async call
        cb(transform(network_result_type));
    });
}

Of-course all callbacks will be called in the thread running the event-loop. So now we have 2 threads - the caller's and the event-loop and with those we can have unlimited async invocations of f0() by the caller. But as you can see this callback nesting and indentation is what i wanted to prevent and thought of using futures-rs.

Using futures:

fn f0<F: FnOnce<Type0>)(cb: F) { // async call
    let future_type_1 = f1(); // async call
    let future_type_2 = f2(); // async call
    let future_type_3 = f3(); // async call
    let joined_futures = future_type_1
        .join(future_type_2)
        .join(future_type_3);

    joined_futures.map(|((type_1, type_2), type_3)| {
        cb(transform(type_1, type_2, type_3));
    }).forget();
}

fn f1() -> Box<Future<Item=Type1, Error=futures::Canceled>> {
    let network_result_future = network_call(); // async call - registers a futures::Complete with event loop returning a futures::Oneshot
    network_result_future.and_then(move |network_result_type| {
        let type_1 = transform(network_result_type);
        futures::finished(type_1)
    }).boxed()
}

fn f2() -> Box<Future<Item=Type2, Error=futures::Canceled>> {
    let network_result_future = network_call(); // async call
    network_result_future.and_then(move |network_result_type| {
        let type_2 = transform(network_result_type);
        futures::finished(type_2)
    }).boxed()
}

fn f3() -> Box<Future<Item=Type3, Error=futures::Canceled>> {
    let network_result_future = network_call(); // async call
    network_result_future.and_then(move |network_result_type| {
        let type_3 = transform(network_result_type);
        futures::finished(type_3)
    }).boxed()
}

Though i wouldn't say the code is particualarly pretty now, i do not have the danger of callback hell and there is no inversion of flow of control like it is in callback based mechanism. Also there are still 2 threads again - one in which the Caller calls f0 and the other in which futures actually execute. Now i get your concern that if all futures were somehow ready, forget() could fire in the same thread as the Caller's (when they called f0()). (This though, is not very important for my use-case but that's beside the point).

So without the use of forget() what is the transformation of the above code ? I don't want to have a thread pool with futures performing blocking waits. That would be ugly and underutilisation of the library because depending on the network-io wait, one could exhaust the thread pool with futures' blocking waits.

alexcrichton commented 8 years ago

Oh so in the example above if you didn't want to bake in the concept of where the future ran, you'd want to punt the future up, e.g. return a future from f0. That way the caller could figure out the most approriate way to run the future. For example it could run it on the event loop or perhaps in a thread pool

ustulation commented 8 years ago

The problem, though, is the first function f0() is called from across the FFI. So i can see only 2 simple solutions - either it's a blocking call (which i wanted to refactor) OR i take an ffi callback in f0() like shown. Returning a future is thus not an option (well i could return an opaque pointer but that wouldn't bring any benefit).

However I have got back the functionality of forget() using a very round about way.

struct MyExecutor;
impl Executor for MyExecutor {
    fn execute(&self, r: Run) {
        r.run();
    }
}

// then later in code instead of `final_future.forget()` what i need to do now is:
task::spawn(final_future).execute(Arc::new(MyExecutor));

This behaves exactly as forget() once again (and if i can use this anyway why remove forget() - it was a convenience). But that is assuming that these are not also soon-to-be-removed ways of doing things.

What i am actually looking for is a notification based solution (like callbacks) instead of polling. forget() was one such solution. The others that you have mentioned seem to be all polling based (delegate a blocking future::wait() to a thread etc). Is there any notification based solution you have in mind (because what i did above with executor does not look pretty - i am not even sure i am meant to use it that way) ?

alexcrichton commented 8 years ago

Ah yeah it's true that you can rebuild forget() with what you did above. We in general found it dangerous enough (running code in "random locations") to not provide by default, but we may need to discuss more!

You can transform polling to notifications with the forget you wrote + oneshot channels I think, would that work? (I may not be precisely understanding what you're thinking)

ustulation commented 8 years ago

Yes with forget() it would work. So when it was removed i was hunting for a mechanism to do it. I was unsure given that forget() was removed if my way of doing it as above was valid at all. But since you sanction the rebuild of forget() the way i did above - i think all is good.

Thanks a ton @alexcrichton !