ArroyoSystems / arroyo

Distributed stream processing engine in Rust
https://arroyo.dev
Apache License 2.0
3.81k stars 223 forks source link

Async UDFs for datafusion #591

Closed mwylde closed 7 months ago

mwylde commented 7 months ago

This PR adds support for async UDFs in arroyo 0.10. An async UDF looks like this:

/*
[dependencies]
reqwest = { version = "0.11.23", features = ["json"] }
serde_json = "1" 
*/

use arroyo_udf_plugin::udf;
use reqwest::Client;

#[udf(allowed_in_flight=100000, timeout="180s")]
pub async fn get_city(ip: String) -> Option<String> {
    use std::sync::OnceLock;
    static CLIENT: OnceLock<Client> = OnceLock::new();
    let client = CLIENT.get_or_init(|| {
      Client::new()
    });

  let body: serde_json::Value =
    client.get(
      format!("http://localhost:6006/{ip}"))
        .send()
        .await
        .ok()?
        .json()
        .await
        .ok()?;

    body.pointer("/names/en").and_then(|t|
      t.as_str()
    ).map(|t| t.to_string())
}

Changes to async UDFs

There are a few differences from the previous incarnation of async UDFs:

Other UDF changes

In addition, there are some changes to how UDFs as a whole work. The biggest is that UDFs now must be annotated with a #[udf] macro that does the necessary codegen. This provides some extra flexibility to users, as they can now include helper functions in their udf defs. Moving the codegen into a macro also makes it much easier to use in tests and provides a path to defining UDFs as crates, outside of the Arroyo web UI.

Async UDFs needed a larger runtime to support their execution. To avoid generating a huge amount of complex code, I opted to implement that runtime as a library—arroyo_udf_plugin. I also moved the sync UDF helper code in their to reduce the complexity and amount of codegen. However, this does mean that the UDF crate now has a dependency on our code.

After this PR is merged, I plan to publish that library as a public crate with version 0.1.0, which will provide a (hopefully) stable interface that users can UDFs with directly. Until that is published, UDF compilation will require running the cluster with the new USE_LOCAL_UDF_LIB=true option, which will instead use the version of arroyo-udf-plugin that is part of the source tree.

I've also moved all of the udf interactions into a new arroyo-udf-host crate, which provides a safe interface to the FFI so that the rest of the arroyo codebase doesn't need to know the exact details of the UDF interface.

UDF testing

Using the new #[udf] macro, I've also added support for testing UDFs. This is now supported both as unit tests (i.e., in arroyo-udf-host) and for smoke tests. All of the existing UDF smoke tests from 0.9 have been re-added using this new framework.