awestlake87 / pyo3-asyncio

Other
312 stars 48 forks source link

RuntimeError: cannot reuse already awaited coroutine #25

Closed sansyrox closed 3 years ago

sansyrox commented 3 years ago

🐛 Question

Hi, I am trying to create a web server implementation using pyo3_async. But, whenever I am trying to make a second call to the same route, I am getting this error RuntimeError: cannot reuse already awaited coroutine.

I am storing the router in a KV pair of string: Py<PyAny>

            let f = self.get_routes.get(&route);
            // pool.push_async(f);
            match f {
                Some(a) => { // a is of the type Py<PyAny>
                    pool.push_async(&a.clone());
                }

I am accessing the PyAny coroutine through the following implementation

        let t = thread::spawn(move || {
            loop {
                let message = receiver.lock().unwrap().recv().unwrap(); // message should be the optional containing the future
                match message {
                    Message::NewJob(j) => {
                        let k = j.as_ref(py);
                        let f = pyo3_asyncio::into_future(&k).unwrap();
                        pyo3_asyncio::async_std::run_until_complete(py, async move {
                            let x = f.await;
                            match (x) {
                                Err(x) => println!(" {}", x), // I am getting the error here
                                Ok(_) => (),
                            };
                            Ok(())
                        })
                        .unwrap();

                    }
                    Message::Terminate => {
                        println!("Worker has been told to terminale");
                    }
                }
            }

Is it possible to clone a coroutine or some other way to reuse the function that I am passing as a param in the form of Py object?

Thanks in advance 😄

awestlake87 commented 3 years ago

I think the problem is that you're calling run_until_complete where you should just be awaiting the future created by pyo3_asyncio::into_future(&k). run_until_complete calls Python's asyncio.EventLoop.run_until_complete, which is meant to be called at the start of your async program (and likely returns only when the program exits). I think what you intend to do here is wait until the task is complete before continuing on to the next message, in which case you should just await the future created by pyo3_asyncio::into_future.

The thing that makes this a little strange is that your receive loop is in a sync context instead of an async context, so something like async_std::task::block_on could bridge the gap for you. For a real quick fix, I'd recommend something like this:

let t = thread::spawn(move || {
    loop {
        let message = receiver.lock().unwrap().recv().unwrap(); // message should be the optional containing the future
        match message {
            Message::NewJob(j) => {
                let k = j.as_ref(py);
                let f = pyo3_asyncio::into_future(&k).unwrap();
                // use async_std::task::block_on instead of pyo3_asyncio::async_std::run_until_complete
                async_std::task::block_on(async move {
                    let x = f.await;
                    match (x) {
                        Err(x) => println!(" {}", x), // I am getting the error here
                        Ok(_) => (),
                    };
                    Ok(())
                });
            }
            Message::Terminate => {
                println!("Worker has been told to terminale");
            }
        }
    }
});

Let me know if that works for you!

sansyrox commented 3 years ago

Hey @awestlake87 , thank you for your answer. But I am getting the following error in the Ok(()) return of the async function

   ^^ cannot infer type for type parameter `E` declared on the enum `Result`

error: aborting due to previous err

If I add this Ok::<(), Error>(()) instead of Ok , I get an undetectable deadlock.

awestlake87 commented 3 years ago

Sometimes you have to specify the error type of a fn or closure if the only way Rust can infer the type is through the ? operator. Since the ? operator allows conversions if a From<T> conversion exists, the actual error type can be ambiguous. (Sorry if my snippet had a compile error, I was away from my laptop when I wrote that).

As for the deadlock, I can't really say for sure what's going on, but I suspect it's something with your initialization. Would it be possible to share a snippet of the entrypoint?

sansyrox commented 3 years ago

Sometimes you have to specify the error type of a fn or closure if the only way Rust can infer the type is through the ? operator. Since the ? operator allows conversions if a From conversion exists, the actual error type can be ambiguous.

But we are not using the ?. anywhere.

(Sorry if my snippet had a compile error, I was away from my laptop when I wrote that).

No worries. I am really grateful for your assistance.

Yes, absolutely.

This is the python entry point

async def h():
    print("This is the message from coroutine")
    return "h"

print("Hello world")
s = roadrunner.Server()
s.add_route("/",h())
s.start()

This is the main initialisation of the server.


    pub fn new() -> Self {
        let url = format!("127.0.0.1:{}", 5000);
        let get_routes: HashMap<Route, Py<PyAny>> = HashMap::new();
        Self {
            port: 5000,
            number_of_threads: 1,
            router: Router::new(),
            threadpool: ThreadPool::new(1),
            listener: TcpListener::bind(url).unwrap(),
            get_routes, // not implemented in router as unable to match lifetimes
        }
    }

Below is how I am implementing the threadpool

    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));
        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

This is the worker thread , that I have shared above


    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
        let t = thread::spawn(move || {
            let py = unsafe { Python::assume_gil_acquired() };

            loop {
                py.run("print('Hello from python')", None, None);
                let message = receiver.lock().unwrap().recv().unwrap(); // message should be the optional containing the future
                match message {
                    Message::NewJob(j) => {
                        println!("Bruhhh");
                        let k = j.as_ref(py);
                        let f = pyo3_asyncio::into_future(&k).unwrap();
                        task::block_on(async move {
                            let x = f.await;
                            match x {
                                Err(x) => println!(" {}", x), // I am getting the error here
                                Ok(_) => (),
                            };
                            Ok(())
                        })
                        .unwrap();
                    }
                    Message::Terminate => {
                        println!("Worker has been told to terminale");
                    }
                }
            }
        });

I was unable to figure out which implementation you are talking about. So I pasted different initialisations here 😅

awestlake87 commented 3 years ago

Sometimes you have to specify the error type of a fn or closure if the only way Rust can infer the type is through the ? operator. Since the ? operator allows conversions if a From conversion exists, the actual error type can be ambiguous.

But we are not using the ?. anywhere.

Right, my bad. The error type is ambiguous on the async move { } block in this case because it never returns an Err for Rust to infer the type from. The reason this wasn't a problem before is that pyo3_asyncio::async_std::run_until_complete is not generic over an error type, it always expects a PyResult<()>.

Ok, one problem I'm seeing, although I'm not sure if it's causing the deadlock, is that you're sharing your mpsc::Receiver across multiple worker threads. mpsc stands for multiple-producer-single-consumer meaning that you shouldn't attempt to share the receive side (which is why it doesn't impl Clone). I think what you want is an mpmc (multi-producer-multi-consumer) channel like the ones you'd find in async_channel for async channels or crossbeam for synchronous channels. These channels have receivers that are Clone so you don't need to wrap the receiver in an Arc<Mutex<T>>. In your example, only one worker can receive a job at a time, which is not what you want.

Assuming that doesn't fix the deadlock, have there been any changes to roadrunner.Server since the last issue #20? Is the start fn still calling run_until_complete?

awestlake87 commented 3 years ago

Also, this is a really bad idea:

let py = unsafe { Python::assume_gil_acquired() };

This might lead to race conditions, segfaults, memory leaks, deadlocks, etc if not used properly. I'd definitely recommend using the safe API with the GIL like Python::with_gil unless there's a really good reason not to.

sansyrox commented 3 years ago

Right, my bad. The error type is ambiguous on the async move { } block in this case because it never returns an Err for Rust to infer the type from. The reason this wasn't a problem before is that pyo3_asyncio::async_std::run_until_complete is not generic over an error type, it always expects a PyResult<()>.

I removed the Ok() statement and the code compiles now. But the coroutine is not even awaited once now 😓 . Maybe, I can make a little hack where I call python functions from rust to add coroutines in rust's hashmap. Are you aware of any other way apart from this?

Ok, one problem I'm seeing, although I'm not sure if it's causing the deadlock, is that you're sharing your mpsc::Receiver across multiple worker threads. mpsc stands for multiple-producer-single-consumer meaning that you shouldn't attempt to share the receive side (which is why it doesn't impl Clone). I think what you want is an mpmc (multi-producer-multi-consumer) channel like the ones you'd find in async_channel for async channels or crossbeam for synchronous channels. These channels have receivers that are Clone so you don't need to wrap the receiver in an Arc<Mutex>. In your example, only one worker can receive a job at a time, which is not what you want.

Thank you for this information, this really makes sense. I will change the implementation in the code.

Assuming that doesn't fix the deadlock, have there been any changes to roadrunner.Server since the last issue #20? Is the start fn still calling run_until_complete?

No this doesn't; fix the deadlock but yes, the Server implementation has changed a bit from the previous issue:

#[pyclass]
pub struct Server {
    port: usize,
    number_of_threads: usize,
    router: Router, //
    threadpool: ThreadPool,
    listener: TcpListener,
    get_routes: HashMap<Route, Py<PyAny>>,
}

// unsafe impl Send for Server {}

#[pymethods]
impl Server {
    #[new]
    pub fn new() -> Self {
        let url = format!("127.0.0.1:{}", 5000);
        let get_routes: HashMap<Route, Py<PyAny>> = HashMap::new();
        Self {
            port: 5000,
            number_of_threads: 1,
            router: Router::new(),
            threadpool: ThreadPool::new(1),
            listener: TcpListener::bind(url).unwrap(),
            get_routes, // not implemented in router as unable to match lifetimes
        }
    }

    pub fn start(&mut self) {
        let listener = &self.listener;
        let pool = &self.threadpool;
        for (k, v) in &self.get_routes {
            println!("Hello world but {} {}", k.get_route(), v);
        }

        for stream in listener.incoming() {
            let mut stream = stream.unwrap();
            let mut buffer = [0; 1024];
            stream.read(&mut buffer).unwrap();
            let route = Route::new(RouteType::Buffer(Box::new(buffer)));
            let status_line = "HTTP/1.1 200 OK";
            let contents = "Hello";
            let len = contents.len();
            let response = format!(
                "{}\r\nContent-Length: {}\r\n\r\n{}",
                status_line, len, contents
            );

            stream.write(response.as_bytes()).unwrap();
            stream.flush().unwrap();
            let f = self.get_routes.get(&route);
            // pool.push_async(f);
            match f {
                Some(a) => {
                    pool.push_async(&a.clone());
                }
                None => {}
            }
        }
    }

    pub fn add_route(&mut self, route: String, handler: Py<PyAny>) {
        self.get_routes
            .insert(Route::new(RouteType::Route(route)), handler);
    }
}
sansyrox commented 3 years ago

Also, this is a really bad idea: let py = unsafe { Python::assume_gil_acquired() }; This might lead to race conditions, segfaults, memory leaks, deadlocks, etc if not used properly. I'd definitely recommend using the safe API with the GIL like Python::with_gil unless there's a really good reason not to.

Yes yes, for sure. I was trying to monkey code here. I wanted to find a way to release the gil before the await call, so that multiple threads can be spawned .

awestlake87 commented 3 years ago

I think I've figured it out after seeing the Server impl. Your start function is driving the server instead of driving Python's event loop with pyo3_asyncio::async_std::run_until_complete. Since Python's event loop is not running, your Python handler h can never complete, so the future f is waiting indefinitely.

Python's event loop needs to run, and it really prefers running on the main thread, so for the simplest fix, I'd recommend changing your start fn to spawn the server code in a worker thread, and then block on pyo3_asyncio::async_std::run_until_complete. This probably involves wrapping your Server struct's state (like TcpListener, routes, etc.) in an Arc<Mutex<T>> since the server code references some of those fields.

Yes yes, for sure. I was trying to monkey code here. I wanted to find a way to release the gil before the await call, so that multiple threads can be spawned.

I think your code can be patched to do this safely like this:

let t = thread::spawn(move || {
    loop {
        let message = receiver.lock().unwrap().recv().unwrap(); // message should be the optional containing the future
        match message {
            Message::NewJob(j) => {
                // Python::with_gil allows you to return anything from the GIL context, so 
                // you can return the future before the await
                let f = Python::with_gil(|py| {
                    let k = j.as_ref(py);
                    pyo3_asyncio::into_future(&k).unwrap()
                });
                // use async_std::task::block_on instead of pyo3_asyncio::async_std::run_until_complete
                async_std::task::block_on(async move {
                    let x = f.await;
                    match (x) {
                        Err(x) => println!(" {}", x), // I am getting the error here
                        Ok(_) => (),
                    };
                });
            }
            Message::Terminate => {
                println!("Worker has been told to terminale");
            }
        }
    }
});

I don't think removing that unsafe block is related to your issue, but it could definitely save you some trouble down the line. It can be tempting to patch around Rust's safety if you're more familiar with C/C++, but I'd argue that unsafe Rust is more dangerous than raw C/C++ because of the safety assurances that Rust's APIs assume you're taking advantage of. In other words, a lot of C/C++ APIs were engineered to prevent users from accidentally corrupting memory (often by cloning or managing their own data), whereas Rust APIs can take shortcuts since they often assume that the compiler will rule these errors out for you, which it cannot completely do with unsafe.

awestlake87 commented 3 years ago

Also, just a heads up about some things I noticed when looking over the code:

I don't really know your full use-case, so take these pointers with a grain of salt. I just thought I'd offer some of my thoughts when reading through these snippets :)

sansyrox commented 3 years ago

just thought I'd offer some of my thoughts when reading through these snippets :)

Hey @awestlake87 , thank you for the tips. 😄

Writing a proper HTTP server from scratch can be deceptively difficult,

Oh, I 100% agree with you on this. But this implementation is just an MVP, of a concept that I wanted to implement.

Your workers can be async/await as well if you use an async_channel and spawn them with async_std::task::spawn instead of std::thread::spawn. That way you don't have to wrap the future returned by pyo3_asyncio::into_future with a async_std::task::block_on.

I am fairly new(read 2 mos of working exp) to rust, so, I may be very wrong about the feasibility of the following implementation. I am trying to implement a server that has a possibility of working across different threads and every different thread has its own async loop or async runtime . If I get the MVP to work well, using prebuilt libraries is a 100% no brainer. I am trying to implement this as I was unable to find anything similar to this in the Python ecosystem. I want to have an individual async runtime per thread instead of just a single async runtime on the main thread, hence I am trying to this.

I would really appreciate your comments on the feasibility of the above, or if this is already possible through pyo3 only.

But, yes I do agree. Using something like hyper or tide is the best way to handle the niceties of a webserver implementation.

sansyrox commented 3 years ago

You should pass h instead of h() into your router. I know on issue #20 we talked about needing to call h before passing the result into pyo3_asyncio::into_future, but the fact that it's a handler function means that you'll want to call h for every request that matches that route. So I think your router should instead store h and call it with the request in your job consumer threads. Otherwise I think you'll see some error like coroutine was already awaited when the second request matches the route for h

Also, is it possible to call a function and convert it to a couroutine within rust only? That way, it will work as a very good router.

awestlake87 commented 3 years ago

Also, is it possible to call a function and convert it to a couroutine within rust only? That way, it will work as a very good router.

If instead you were to store h in your route with the following Python code:

s.add_route("/",h)

then the Py<PyAny> would be a reference to the h function itself. Now your start function will forward the reference to h through the channel when the route is matched.

If you change your worker loop to call h, then it should work (after making start block on run_until_complete again):

    loop {
        let message = receiver.lock().unwrap().recv().unwrap(); // message should be the optional containing the future
        match message {
            Message::NewJob(j) => {
                // Python::with_gil allows you to return anything from the GIL context, so 
                // you can return the future before the await
                let f = Python::with_gil(|py| {
                    // j is now a reference to `h`, so we should call `h` here to create the coroutine
                    let coro = j.as_ref(py).call0().unwrap();
                    pyo3_asyncio::into_future(&coro).unwrap()
                });
    ...

I am fairly new(read 2 mos of working exp) to rust, so, I may be very wrong about the feasibility of the following implementation. I am trying to implement a server that has a possibility of working across different threads and every different thread has its own async loop or async runtime.

I'm not sure what advantages this would have. When it comes to performance, each runtime has its own dedicated pool of threads that balance the workload very effectively. Adding a second runtime means adding another pool of threads to balance a separate workload, but as far as I know, there's really no benefit to this. In my opinion, it's likely less efficient than a single runtime because all threads have a performance cost due to what's called "context switching". Usually each runtime checks the number of cores in your system to find the optimal number of threads to spawn in their worker pool, so adding more runtimes most likely means a suboptimal thread-count. In addition, you might have low thread utilization because your workload is split across many more threads.

There could be other benefits that I'm not aware of, but I'm almost certain using a single global runtime is better. pyo3-asyncio is a bit different in that it uses two global runtimes, but that's really just because nobody has implemented a unified runtime for both Rust and Python yet. The simplest solution was just to provide tools to signal between both runtimes.

sansyrox commented 3 years ago

I'm not sure what advantages this would have. When it comes to performance, each runtime has its own dedicated pool of threads that balance the workload very effectively. Adding a second runtime means adding another pool of threads to balance a separate workload, but as far as I know, there's really no benefit to this. In my opinion, it's likely less efficient than a single runtime because all threads have a performance cost due to what's called "context switching". Usually each runtime checks the number of cores in your system to find the optimal number of threads to spawn in their worker pool, so adding more runtimes most likely means a suboptimal thread-count. In addition, you might have low thread utilization because your workload is split across many more threads.

Thank you, this makes sense. I always had the assumption that the async event loops are some sort of blockable and resumable "loops" but making threads inside threads, will definitely add no value.

Since linux treats process and threads similarly, I mixed them in my own thoughts. 😭 If async run times use threads in their implementation, then it definitely makes sense to keep one runtime per process. Maybe, I will try to spawn different runtimes per process and use some IPC technique to share the stream in the future. The reason I wanted to use threads(now processes) is that I wanted a way to control the way the server can optimise different cores of a machine. Since python threadpool and multiprocessing apis share a lot of design decisions, I got confused and this created a lot of issues here. 😆

If instead you were to store h in your route with the following Python code: s.add_route("/",h) then the Py would be a reference to the h function itself. Now your start function will forward the reference to h through the channel when the route is matched. If you change your worker loop to call h, then it should work (after making start block on run_until_complete again): loop { let message = receiver.lock().unwrap().recv().unwrap(); // message should be the optional containing the future match message { Message::NewJob(j) => { // Python::with_gil allows you to return anything from the GIL context, so // you can return the future before the await let f = Python::with_gil(|py| { // j is now a reference to h, so we should call h here to create the coroutine let coro = j.as_ref(py).call0().unwrap(); pyo3_asyncio::into_future(&coro).unwrap() });

Thank you for the above snippet. This is really helpful and thanks again for all the help. 😄

sansyrox commented 3 years ago

Hi @awestlake87 ,

I tried to change my server's implementation(as we discussed here ) to the following:

#[pymethods]
impl Server {
    #[new]
    pub fn new() -> Self {
        let url = format!("127.0.0.1:{}", 5000);
        let get_routes: HashMap<Route, Py<PyAny>> = HashMap::new();
        Self {
            port: 5000,
            number_of_threads: 1,
            router: Router::new(),
            threadpool: Arc::new(Mutex::new(ThreadPool::new(1))),
            listener: Arc::new(Mutex::new(TcpListener::bind(url).unwrap())),
            get_routes, // not implemented in router as unable to match lifetimes
        }
    }

    pub fn start(&'static mut self, py: Python) {
        thread::spawn(move || {
            let listener_ = &self.listener.lock().unwrap();
            let pool = &self.threadpool.lock().unwrap();

            for stream in listener_.incoming() {
                let mut stream = stream.unwrap();
                let mut buffer = [0; 1024];
                stream.read(&mut buffer).unwrap();
                let route = Route::new(RouteType::Buffer(Box::new(buffer)));
                let status_line = "HTTP/1.1 200 OK";
                let contents = "Hello";
                let len = contents.len();
                let response = format!(
                    "{}\r\nContent-Length: {}\r\n\r\n{}",
                    status_line, len, contents
                );

                stream.write(response.as_bytes()).unwrap();
                stream.flush().unwrap();
                let f = self.get_routes.get(&route);
                match f {
                    Some(a) => {
                        pool.push_async(&a.clone());
                    }
                    None => {}
                }
            }
        });

        pyo3_asyncio::async_std::run_until_complete(py, async move { loop {} }).unwrap();
    }

But I am getting the following errors:

error[E0597]: `_ref` does not live long enough
  --> src/server.rs:26:1
   |
26 | #[pymethods]
   | ^^^^^^^^^^^-
   | |          |
   | |          `_ref` dropped here while still borrowed
   | borrowed value does not live long enough
   | argument requires that `_ref` is borrowed for `'static`
   |
   = note: this error originates in an attribute macro (in Nightly builds, run with -Z macro-backtrace for more info)

error[E0597]: `pool` does not live long enough
  --> src/server.rs:26:1
   |
26 | #[pymethods]
   | ^^^^^^^^^^^^
   | |
   | borrowed value does not live long enough
   | `pool` dropped here while still borrowed
   | argument requires that `pool` is borrowed for `'static`
   |
   = note: this error originates in a macro (in Nightly builds, run with -Z macro-backtrace for more info)

What does it really mean? The pool argument in my code is wrapped in a closure. Otherwise, how do I make a change to the #pymethod macro??

Also, how do you enable syntax highlighting in your code snippets?

awestlake87 commented 3 years ago

I think the reason you gave &mut self the 'static lifetime is because std::thread::spawn is complaining when you try to use some state from your Server struct inside the closure. self here is borrowed, but std::thread::spawn needs to either own the objects it works with or guarantee that they cannot die while the closure is running.

The compiler may allow you to specify &'static mut self to suppress that error, but there are really only a few niche situations where that makes sense. In #[pymethods] you are borrowing an object owned by the Python interpreter, and the Python interpreter could potentially go out of scope while the thread is running since there is no guarantee that you will join the thread before the interpreter goes out of scope. This is why the 'static lifetime doesn't work here.

Since 'static references can't work, the only option left is to pass that data by value so the thread can take ownership of it. I anticipated this earlier on in the thread actually:

Python's event loop needs to run, and it really prefers running on the main thread, so for the simplest fix, I'd recommend changing your start fn to spawn the server code in a worker thread, and then block on pyo3_asyncio::async_std::run_until_complete. This probably involves wrapping your Server struct's state (like TcpListener, routes, etc.) in an Arc<Mutex> since the server code references some of those fields.

By wrapping your Server struct's state in an Arc<Mutex<T>>, you can clone the Arc to get a reference that you can pass to the thread by value. An Arc guarantees that while someone is holding a reference to the data the data will not be destroyed, so this satisfies the 'static lifetime. The Mutex allows you to mutate the fields since Arc by itself only allows immutable references. If you move your TcpListener out of the Server state and create it within the thread, then you likely won't need the Mutex since your router doesn't need to be mutated.

sansyrox commented 3 years ago

By wrapping your Server struct's state in an Arc<Mutex>, you can clone the Arc to get a reference that you can pass to the thread by value. An Arc guarantees that while someone is holding a reference to the data the data will not be destroyed, so this satisfies the 'static lifetime. The Mutex allows you to mutate the fields since Arc by itself only allows immutable references. If you move your TcpListener out of the Server state and create it within the thread, then you likely won't need the Mutex since your router doesn't need to be mutated.

Thank you for this suggestion @awestlake87 . This was very helpful.

Now, my server implementation looks like this:

impl Server {
    #[new]
    pub fn new() -> Self {
        let get_routes: HashMap<Route, Py<PyAny>> = HashMap::new();
        Self {
            port: 5000,
            number_of_threads: 1,
            router: Router::new(),
            threadpool: Arc::new(Mutex::new(ThreadPool::new(1))),
            get_routes: Arc::new(get_routes), // not implemented in router as unable to match lifetimes
        }
    }

    pub fn start(&mut self, py: Python) {
        let url = format!("127.0.0.1:{}", &self.port);
        let get_router = self.get_routes.clone();
        let pool = self.threadpool.clone();

        thread::spawn(move || {
            let listener_ = TcpListener::bind(url).unwrap();
            let pool = pool.lock().unwrap();

            for stream in listener_.incoming() {
                let mut stream = stream.unwrap();
                let mut buffer = [0; 1024];
                stream.read(&mut buffer).unwrap();
                let route = Route::new(RouteType::Buffer(Box::new(buffer)));
                let status_line = "HTTP/1.1 200 OK";
                let contents = "Hello";
                let len = contents.len();
                let response = format!(
                    "{}\r\nContent-Length: {}\r\n\r\n{}",
                    status_line, len, contents
                );

                stream.write(response.as_bytes()).unwrap();
                stream.flush().unwrap();
                let f = get_router.get(&route);
                match f {
                    Some(a) => {
                        pool.push_async(&a.clone());
                    }
                    None => {}
                }
            }
        });

        let py_loop = pyo3_asyncio::async_std::run_until_complete(py, async move { loop {} });
        match py_loop {
            Ok(_) => {}
            Err(_) => {
                process::exit(1);
            }
        };
    }

i.e. I am running a python async loop forever. But now async functions are still not working.

I tried two things but the function still doesn't work in any of them.

One approach is this:

        let t = thread::spawn(move || {
            let py = unsafe { Python::assume_gil_acquired() };

            loop {
                let message = receiver.recv().unwrap(); // message should be the optional containing the future
                match message {
                    Message::NewJob(j) => {
                        let coro = j.as_ref(py).call0().unwrap();
                        let f = pyo3_asyncio::into_future(&coro).unwrap();

                        pyo3_asyncio::async_std::run_until_complete(py, async move {
                            let x = f.await;
                            match x {
                                Err(x) => println!(" {}", x), // I am getting the error here
                                Ok(_) => (),
                            };
                            Ok(())
                        })
                        .unwrap();
                    }
                    Message::Terminate => {
                        println!("Worker has been told to terminale");
                    }
                }
            }
        });

and the second approach is this replacing run_until_complete with async_std::task::block_on. I even tried replacing the unsafe python with Python::with_gil. But, the coroutine is never being awaited. Also, I am not getting any error message. How do I debug this?

sansyrox commented 3 years ago

Sorry, for the above comment. I have fixed it. I was putting it in the wrong router.

This is the below snippet I used:

    pub fn add_route(&mut self, route_type: &str, route: String, handler: Py<PyAny>) {
        println!("{} {} ", route_type, route);
        let route = Route::new(RouteType::Route(route));
        let mut getr = self.get_routes.lock().unwrap();
        getr.insert(route, handler);
    }

I used plain old println for this. Let me know if you know of any other way 😅

How do I debug this?

Thanks again.

awestlake87 commented 3 years ago

I used plain old println for this. Let me know if you know of any other way

Sometimes println! can be the easiest way to debug these things. If you want to keep the log there for debugging purposes, a cleaner way might be to use the env_logger crate and replace the println! with log::debug!. That way you can pass a RUST_LOG environment variable to filter log messages later on or strip them out entirely at compile time for release builds.

Sorry, for the above comment. I have fixed it. I was putting it in the wrong router.

Ok, so everything's working like it should now?

sansyrox commented 3 years ago

Ok, so everything's working like it should now?

Thankfully, yes!

I just have one more question before you close this issue, do you have an idea about whether it is allowed to spawn a runtime per process? Would I need to switch back to Tokio or async-std will work well? If I have to, will the python event loop also spawn per process?

Thank you again for all the help! 😄

awestlake87 commented 3 years ago

I just have one more question before you close this issue, do you have an idea about whether it is allowed to spawn a runtime per process? Would I need to switch back to Tokio or async-std will work well? If I have to, will the python event loop also spawn per process?

It's allowed, tokio or async-std will work fine in this instance and each process will have its own python event loop.

That being said, my advice on this is just to leave this sort of thing to a load balancer. AFAIK, most modern web servers use a single process inside a container and an orchestrator like Kubernetes scales up / scales down these containers as needed. A reverse proxy like Traefik can distribute the requests across these containers for you. This way, you don't have to worry about shared memory, IPC, or subprocesses.

sansyrox commented 3 years ago

Hey @awestlake87 ! 👋

I just released the first version of the web server I was writing(https://sansyrox.github.io/robyn) . Thank you for the help! 😁