Abraxas-365 / langchain-rust

🦜️🔗LangChain for Rust, the easiest way to write LLM-based programs in Rust
MIT License
491 stars 63 forks source link

async operations are not really async #142

Open jondot opened 4 months ago

jondot commented 4 months ago

Hi, I've been working on a langchain variant in Rust myself, and I am reading this project to compare notes. I can offer a tip, something I've bumped into in the past: when running CPU bound operations (loading text from PDF, splitting text, embedding, and more), you need to offload to one of the two:

  1. Tokio's builtin async pool with an async task
  2. Create your own dedicated pool for specific resources (a pool for embedders, a pool for splitters, etc), and offload to that dedicated pool (still using Tokio, of course)

For one-time loading during boot process of the app or similar workflow I recommend (1), For multi-request processing, such as initializing a full-blown chain and then putting it on a webservice for serving multiple inference requests (with the same chain), I recommend (2).

It is a non-trivial refactor as you might discover some pieces of code are not Sync or Send and you'll have to drill down to make them such. However, there's no getting around this kind of refactor, as currently CPU bound operations will completely block Tokio's async operations and make using Tokio pointless.

prabirshrestha commented 4 months ago

I have thought about this but not sure if there is a good way to solve this. For example, if I have these running in a worker which is is already in a thread pool or new thread, I wouldn't want to create it a new thread. This should be controlled by the user.

Wondering if it instead makes sense to have some sort of Parallelism similar to how jwalk crate does it. https://docs.rs/jwalk/latest/jwalk/enum.Parallelism.html.

pub enum Parallelism {
    Serial,
    RayonDefaultPool {
        busy_timeout: [Duration](https://doc.rust-lang.org/nightly/core/time/struct.Duration.html),
    },
    RayonExistingPool {
        pool: Arc<ThreadPool>,
        busy_timeout: Option<Duration>,
    },
    RayonNewPool(usize),
}

We could pick the best one by default so users don't have to worry much. they can always use Serial and add to their own scheduler. Another option is to use custom scheduler such as https://caolan.github.io/tamawiki/tokio/executor/trait.Executor.html.

jondot commented 4 months ago

Hmmm interesting. Not sure, I think what is certain is that an async runtime meeting a blocking CPU action is killing the runtime.

prabirshrestha commented 4 months ago

I think at minimal what needs to happen is convert Read to AsyncRead i.e. use async io instead of sync io. For processing that is heavy and can't be solved with moving to async io it would need the executors.

Do note that some libraries don't support async io such as csv so we need to use an alternate library. https://github.com/BurntSushi/rust-csv/issues/171

jondot commented 4 months ago

yea that is kinda my point, you don't have to use async variant of a library if it does not exist, you can use Tokio to "async defer" it to a pool.

Here's how I do it in my library, taking a costly CPU bound operation and making it compatible with Tokio, by moving it into a rayon pool, backed by Tokio sync primitives:

    async fn embed(&self, inputs: &[&str]) -> Result<Vec<Vec<f32>>> {
        let (send, recv) = tokio::sync::oneshot::channel();

        rayon::scope(|s| {
            s.spawn(|_| {
                let _ = send.send(self.sync_embed(inputs));
            });
        });

        recv.await?
    }

In here, I leave it to the user to implement their own sync_embed function per-provider, so that they don't deal with the innerworkings of async. The trait Embedding implements the boilerplate of taking something that's sync IO bound, and turning it into async Tokio-friendly operation.

mqudsi commented 3 months ago

@jondot doesn’t rayon::scope() block until all spawned tasks have completed?