teohhanhui / callbag-rs

Rust implementation of the callbag spec for reactive/iterable programming
Apache License 2.0
30 stars 2 forks source link
callbag frp iterable observable reactive rust stream

callbag-rs

Rust implementation of the callbag spec for reactive/iterable programming.

Basic callbag factories and operators to get started with.

Highlights:

Imagine a hybrid between an Observable and an (Async)Iterable, that's what callbags are all about. It's all done with a few simple callbacks, following the callbag spec.

CI Crates.io Documentation MIT OR Apache-2.0 licensed

Examples

Reactive programming examples

Pick the first 5 odd numbers from a clock that ticks every second, then start observing them:

use async_nursery::Nursery;
use crossbeam_queue::SegQueue;
use std::{sync::Arc, time::Duration};

use callbag::{filter, for_each, interval, map, pipe, take};

let (nursery, nursery_out) = Nursery::new(async_executors::AsyncStd);

let actual = Arc::new(SegQueue::new());

pipe!(
    interval(Duration::from_millis(1_000), nursery.clone()),
    map(|x| x + 1),
    filter(|x| x % 2 == 1),
    take(5),
    for_each({
        let actual = Arc::clone(&actual);
        move |x| {
            println!("{x}");
            actual.push(x);
        }
    }),
);

drop(nursery);
async_std::task::block_on(nursery_out);

assert_eq!(
    &{
        let mut v = vec![];
        while let Some(x) = actual.pop() {
            v.push(x);
        }
        v
    }[..],
    [1, 3, 5, 7, 9]
);

Iterable programming examples

From a range of numbers, pick 5 of them and divide them by 4, then start pulling those one by one:

use crossbeam_queue::SegQueue;
use std::sync::Arc;

use callbag::{for_each, from_iter, map, pipe, take};

#[derive(Clone)]
struct Range {
    i: usize,
    to: usize,
}

impl Range {
    fn new(from: usize, to: usize) -> Self {
        Range { i: from, to }
    }
}

impl Iterator for Range {
    type Item = usize;

    fn next(&mut self) -> Option<Self::Item> {
        let i = self.i;
        if i <= self.to {
            self.i += 1;
            Some(i)
        } else {
            None
        }
    }
}

let actual = Arc::new(SegQueue::new());

pipe!(
    from_iter(Range::new(40, 99)),
    take(5),
    map(|x| x as f64 / 4.0),
    for_each({
        let actual = Arc::clone(&actual);
        move |x| {
            println!("{x}");
            actual.push(x);
        }
    }),
);

assert_eq!(
    &{
        let mut v = vec![];
        while let Some(x) = actual.pop() {
            v.push(x);
        }
        v
    }[..],
    [10.0, 10.25, 10.5, 10.75, 11.0]
);

Ok::<(), Box<dyn std::error::Error>>(())

API

The list below shows what's included.

Source factories

Sink factories

Transformation operators

Filtering operators

Combination operators

Utilities

Terminology

License

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

Acknowledgements

Thanks to André Staltz (@staltz) for creating the callbag spec.

This library is a port of https://github.com/staltz/callbag-basics. Some inspiration was taken from https://github.com/f5io/callbag.rs.

Many thanks to the awesome folks on the Rust Users Forum for their help, especially: