ArroyoSystems / arroyo

Distributed stream processing engine in Rust
https://arroyo.dev
Apache License 2.0
3.44k stars 188 forks source link

Support UDFs on raw_bytes #665

Closed mwylde closed 1 week ago

mwylde commented 1 week ago

This PR allows users to write UDFs that read from and produce binary data (via the raw_bytes format). This is generally useful for parsing custom/unsupported binary formats. For example, here's a UDF that transcodes from cbor to json

/*
[dependencies]
serde_cbor = "0.11"
serde_json = "1"
serde = {version = "1", features = ["derive"]}
serde-transcode = "1"
*/

use arroyo_udf_plugin::udf;

#[udf]
fn cbor_to_json(data: &[u8]) -> Option<String> {
    let mut deserializer = serde_cbor::Deserializer::from_slice(data);
    let mut buf = std::io::BufWriter::new(Vec::new());
    let mut serializer = serde_json::Serializer::new(&mut buf);
    serde_transcode::transcode(&mut deserializer, &mut serializer).ok()?;
    let bytes = buf.into_inner().unwrap();

    Some(String::from_utf8(bytes).ok()?)
}

There are also some unrelated fixes to UDF nullability handling as well as a breaking change: UDF String and byte arguments are now expected to be reference types (&str and &[u8]) rather than owned types so that we can avoid unnecessary copies.