sklose / disrustor

A port of the LMAX Disruptor to Rust
MIT License
102 stars 7 forks source link

RefCell required to move into handler fn #2

Closed louisponet closed 1 year ago

louisponet commented 1 year ago

Hi,

Thanks for this awesome library!

I'm trying to implement some single producer multi consumer logic where the consumers need to be able to reference some persistent data/data storage. Given that handle_events accepts only Fn closures, how would one go about achieving this?

Cheers

sklose commented 1 year ago

Hi Louis,

I am not entirely sure what you mean by persistent data. The disrupter gives you an ephemeral ring buffer and a way to coordinate multiple threads accessing ranges in that ring either mutably or immutably. A common pattern with the disruptor is to use journal files for persistence/recoverability of data. Martin Fowler has published a great article on this https://martinfowler.com/articles/lmax.html. You generally try to keep your business logic free from any kind of I/O and use separate consumer threads on the ring buffer for that. Batching allows you to implement I/O very efficiently so it doesn't slow down your business logic. Hope this answers your question.

Here is some pseudo code on how I have used the disruptor in the past.

#[derive(Default)]
enum Command {
    PlaceOrder {
        symbol: &'static str,
        buy: bool,
        qty: f64,
        limit_price: f64,
        cl_ord_id: &'static str,
    },
    CancelOrder {
        cl_ord_id: &'static str,
    },
    MarketData {
        symbol: &'static str,
        last_price: f64
    }
}

#[derive(Default)]
struct Trade {
    cl_ord_id: &'static str,
    buy: bool,
    qty: f64,
    price: f64,
}

#[derive(Default)]
struct Item {
    cmd: Command,
    output: Option<Trade>,
}

let journal = Journal::new(); // implemented somewhere else
let networking Networking::new(); // implemented somewhere else
let matching_engine = MatchingEngine::new(); // implemented somewhere else
let replicator = Replicator::new(); // implemented somewhere else

let (executor, producer) = DisrustorBuilder::with_ring_buffer(4096)
    .with_wait_strategy::<BlockingWaitStrategy>()
    .with_single_producer()
    .with_barrier(|b| {
        // journal + replication can happen in parallel
        b.handle_events(move |data, _, eob| {
            // write all inputs to disk, this can be used to
            // recover the state via replay
            journal.append(data);
            if eob {
                journal.flush();
            }
        });
        b.handle_events(move |data, _, eob| {
            // replicate all inputs to a secondary
            // instance for high availability
            replicator.replicate(data);
        });
    })
    .with_barrier(|b| {
        b.handle_events_mut(move |data, _, _| {
            // let the matching engine handle the command
            data.output = matching_engine.tick(data.cmd);
        });
    })
    .with_barrier(|b| {
        b.handle_events(move |data, _, _| {
            // publish matched trades 
            if let Some(trade) = data.output {
                networking.publish(trade);
            }
        });
    })
    .build();

let handle = executor.spawn();

let cmds = vec![
    Command::PlaceOrder { symbol: "MSFT", qty: 1, limit: 220, buy: true, cl_ord_id: "1234" },
    Command::MarketData { symbol: "MSFT", last_price: 219 },
];

producer.write(cmds, |d, _, cmd| {
    d.cmd = cmd;
    d.result = None;
});

producer.drain();
handle.join();
louisponet commented 1 year ago

Hi Sebastian,

This is spot on my usecase! I'm actually looking to implement a trading system haha.

I see, I think my mistake was to try to put the "persistent data" in the with_barrier closure to then be used inside different handle_events closures. I'm very new to Rust so I guess I didn't quite understand how nested closures rules work. I will try to do it like you suggested here, thank you!

Cheers

louisponet commented 1 year ago

So I'm trying a simplified version of your example:

#[derive(Debug)]
struct Msg {
    pub t: Instant,
    pub i: u32
}
const MAX: i64 = 200i64;

impl Default for Msg {
    fn default() -> Self { Self {t: Instant::now(), i: 0}}
}

impl From<Msg> for i64 {
    fn from(msg: Msg) -> i64 {i64::from(msg.i)}
}

impl Copy for Msg { }

impl Clone for Msg {
    fn clone(&self) -> Msg {
        *self
    }
}
fn main () {
    let mut journal = Vec::new(); // implemented somewhere else
    let mut journal2 = Vec::new(); // implemented somewhere else

    let (executor, producer) = DisrustorBuilder::with_ring_buffer(4096)
        .with_wait_strategy::<BlockingWaitStrategy>()
        .with_single_producer()
        .with_barrier(|b| {
            // journal + replication can happen in parallel
            b.handle_events(move |&data: &Msg, _, eob| {
                // write all inputs to disk, this can be used to
                // recover the state via replay
                //
                journal.push(data.i);
            });
            b.handle_events(move |data, _, eob| {
                // replicate all inputs to a secondary
                // instance for high availability
                journal2.push(data.i);
            });
        })
        .build();

    let handle = executor.spawn();

    let cmds = vec![
        Msg { t: Instant::now(), i: 0},
        Msg { t: Instant::now(), i: 1},
    ];

    producer.write(cmds, |d, _, cmd| {
        d.t = cmd.t;
        d.i = cmd.i;
    });

    producer.drain();
    handle.join();
}

And I get:

error[E0596]: cannot borrow `journal` as mutable, as it is a captured variable in a `Fn` closure
   --> src/main.rs:113:17
    |
113 |                 journal.push(data.i);
    |                 ^^^^^^^^^^^^^^^^^^^^ cannot borrow as mutable

error[E0596]: cannot borrow `journal2` as mutable, as it is a captured variable in a `Fn` closure
   --> src/main.rs:118:17
    |
118 |                 journal2.push(data.i);
    |                 ^^^^^^^^^^^^^^^^^^^^^ cannot borrow as mutable

What should be the types going into these closures? Should they be wrapped in something?

sklose commented 1 year ago

This is an interesting find. I can't think of a reason why the compiler should reject that code. It seems to work if you wrap it in a RefCell. It's slightly less efficient, but it shouldn't have a huge performance impact. Please let me know if you find a solution that doesn't require a RefCell.

use disrustor::{*, internal::BlockingWaitStrategy};
use std::{time::Instant, cell::RefCell};

#[derive(Debug)]
struct Msg {
    pub t: Instant,
    pub i: u32
}
const MAX: i64 = 200i64;

impl Default for Msg {
    fn default() -> Self { Self {t: Instant::now(), i: 0}}
}

impl From<Msg> for i64 {
    fn from(msg: Msg) -> i64 {i64::from(msg.i)}
}

impl Copy for Msg { }

impl Clone for Msg {
    fn clone(&self) -> Msg {
        *self
    }
}
fn main () {
    let journal = RefCell::new(Vec::<u32>::new()); // implemented somewhere else
    let journal2 = RefCell::new(Vec::<u32>::new()); // implemented somewhere else

    let (executor, producer) = DisrustorBuilder::with_ring_buffer(4096)
        .with_wait_strategy::<BlockingWaitStrategy>()
        .with_single_producer()
        .with_barrier(|b| {
            // journal + replication can happen in parallel
            b.handle_events(move |data: &Msg, _, eob| {
                // write all inputs to disk, this can be used to
                // recover the state via replay
                journal.borrow_mut().push(data.i);
            });
            b.handle_events(move |data, _, eob| {
                // replicate all inputs to a secondary
                // instance for high availability
                journal2.borrow_mut().push(data.i);
            });
        })
        .build();

    let handle = executor.spawn();

    let cmds = vec![
        Msg { t: Instant::now(), i: 0},
        Msg { t: Instant::now(), i: 1},
    ];

    producer.write(cmds, |d, _, cmd| {
        d.t = cmd.t;
        d.i = cmd.i;
    });

    producer.drain();
    handle.join();
}
louisponet commented 1 year ago

Yes, I also found it to work with RefCell. I guess it's because the compiler isn't able to figure out that the only place where journal is used is in fact inside that function.

If I try:

fn main () {
    let mut journal = std::cell::RefCell::new(Vec::new()); // implemented somewhere else
    let mut journal2 = Vec::new(); // implemented somewhere else

    let f1 = move |&data: &Msg, _: Sequence, eob: bool| -> () {
        // write all inputs to disk, this can be used to
        // recover the state via replay
        journal.get_mut().push(data.i);

    }

    let (executor, producer) = DisrustorBuilder::with_ring_buffer(4096)
        .with_wait_strategy::<BlockingWaitStrategy>()
        .with_single_producer()
        .with_barrier(|b| {
            // journal + replication can happen in parallel
            b.handle_events(f1);
            b.handle_events(move |&data: &Msg, _, eob| {
                // replicate all inputs to a secondary
                // instance for high availability
                journal2.push(data.i);
            });
        })
        .build();

    let handle = executor.spawn();

    let cmds = vec![
        Msg { t: Instant::now(), i: 0},
        Msg { t: Instant::now(), i: 1},
    ];

    producer.write(cmds, |d, _, cmd| {
        d.t = cmd.t;
        d.i = cmd.i;
    });

    producer.drain();
    handle.join();
}

I run into:

error[E0525]: expected a closure that implements the `Fn` trait, but this closure only implements `FnMut`
   --> src/main.rs:104:14
    |
104 |     let f1 = move |&data: &Msg, _: Sequence, eob: bool| -> () {
    |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ this closure implements `FnMut`, not `Fn`
...
107 |         journal.get_mut().push(data.i);
    |         ------- closure is `FnMut` because it mutates the variable `journal` here
...
116 |             b.handle_events(f1);
    |               ------------- -- the requirement to implement `Fn` derives from here
    |               |
    |               required by a bound introduced by this call
    |
note: required by a bound in `BarrierScope::<'a, S, D, T>::handle_events`
   --> /home/ponet/.cargo/registry/src/github.com-1ecc6299db9ec823/disrustor-0.3.0/src/dsl.rs:146:12
    |
146 |         F: Fn(&T, Sequence, bool) + Send + 'static,
    |            ^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `BarrierScope::<'a, S, D, T>::handle_events`

I don't know if there's a way to "once and for all" capture something into a closure.

louisponet commented 1 year ago

Would it be possible to use FnMut instead of Fn in the library? I guess then this would not pop up. Again I might be completely misunderstanding things though, so I don't know if it's ok to just replace things like that...

sklose commented 1 year ago

I just gave that a try. Unfortunately, it does not resolve the compiler errors. I just get a different error then. Seems the only workaround, for now, is RefCell. When I get some time, I might add a trait for event handlers. We can probably get around this problem with an API that accepts an event handler trait implementation instead of a closure.

louisponet commented 1 year ago

Hmm I managed to get it to accept this:

fn main () {
    let mut journal  = 0; // implemented somewhere else
    let mut journal2 = 0; // implemented somewhere else
    // let journal2 = std::cell::RefCell::new(Vec::<u32>::new()); // implemented somewhere else

    let (executor, producer) = DisrustorBuilder::with_ring_buffer(4096)
        .with_wait_strategy::<SpinLoopWaitStrategy>()
        .with_single_producer()
        .with_barrier(|b| {
            // journal + replication can happen in parallel
            b.handle_events(move |data: &Msg, _, eob| {
                // write all inputs to disk, this can be used to
                // recover the state via replay
                journal += data.i;
                println!("journal {}", journal);
            });
            b.handle_events(move |data: &Msg, _, eob| {
                // replicate all inputs to a secondary
                // instance for high availability
                journal2 += 1;
                println!("journal2 {}", journal2);
            });
        })
        .build();

    let handle = executor.spawn();

    for i in 1..=MAX {
        std::thread::sleep_ms(1);
        producer.write([i], |d, _, _| {
            d.t = Instant::now();
            d.i = i as u32;
        });
    }

    producer.drain();
    handle.join();
}

by changing Fn to FnMut and some other things here and there to satisfy the compiler. I think it would be nicer to have an API though. I will try to work that out and if I succeed I'll make a PR.