nicholassm / disruptor-rs

Low latency inter-thread communication library in Rust inspired by the LMAX Disruptor.
MIT License
618 stars 18 forks source link

Difficult to use in Processor #9

Closed lluckydog closed 3 months ago

lluckydog commented 4 months ago

Hello, I'm impressed about the high performance in this library, but I'm a little confused about the usage of processor. It seems like the processor is dealing with the task. Is it possible to record some state in the processor other than just filling a task in process?

nicholassm commented 4 months ago

Yes, absolutely (if I understand your question correctly). You can move any data into the closure to keep state between events. Does that make sense?

Kind regards Nicholas

lluckydog commented 4 months ago

Yes, this is exactly what I means. As I am new to Rust, I am a little confused about the use of Processor. Data in the processor can be stored and passed to the thread using move across the events. Is that right?

lluckydog commented 4 months ago

Is it possible for me to initialize some code before Processor dealing with event exactly in the thread, as some data like Rc, RefCell cannot passed across the thread in the current framework? These data If you like this idea, would you like give me some advice?my original code without disruptor looks like this

      let runner_thread = thread::spawn(move || {
            let data_rc = Rc::new(RefCell::new(data));
            while let Some(event) = rx.recv()  {
                  //do some event with data_rc
            }
      }
nicholassm commented 4 months ago

You cannot initialize some code before as you show in your example. But there are lot of data structures that are Send and can be used in the closure invoked when there are new events available. Why specifically do you need to wrap you data in an RC and a RefCell?

As an example, if you data looks like this struct:

Struct Data {
    counter: u64,
    message: String,
}

That can move into the closure and be mutable.

lluckydog commented 4 months ago

I define a complex struct with some different datas and many functions in it which uses Rc and RefCell. Using the struct without RC and RefCell can be inconvenient. Could you give me some advice?

nicholassm commented 4 months ago

Well, it defeats the low latency purpose a bit but you can use Arc instead of Rc and Mutex instead of RefCell... In case it's none latency critical state being guarded by the Mutex it should be fine. Try it out and measure the performance. Alternatively, try Crossbeam - that is also a highly performant library.

ons. 8. maj 2024 03.21 skrev danddy @.***>:

I define a complex struct with some different datas and many functions in it which uses Rc and RefCell. Using the struct without RC and RefCell can be inconvenient. Could you give me some advice?

— Reply to this email directly, view it on GitHub https://github.com/nicholassm/disruptor-rs/issues/9#issuecomment-2099557192, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABJ7D4NKU7EBLCNNYDYTIDZBF42HAVCNFSM6AAAAABHKEFMW6VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDAOJZGU2TOMJZGI . You are receiving this because you commented.Message ID: @.***>

lluckydog commented 4 months ago

Ok, thank you!

nicholassm commented 3 months ago

Hi @lluckydog,

I've just released version 2.1.0 where you can pass initial state that is neither Send nor Sync such as Rc<RefCell<Data>> to the Disruptor when creating it.

As an example:

struct Event {
    price: f64
}

#[derive(Default)]
struct State {
    data: Rc<RefCell<i32>>
}

fn main() {
    let factory = || { Event { price: 0.0 }};
    let initial_state = || { State::default() };

    // Closure for processing events *with* state.
    let processor = |s: &mut State, e: &Event, _: Sequence, _: bool| {
        // Mutate your custom state:
        *s.data.borrow_mut() += 1;
    };

    let size = 64;
    let mut producer = disruptor::build_single_producer(size, factory, BusySpin)
        .handle_events_and_state_with(processor, initial_state)
        .build();

    for i in 0..10 {
        producer.publish(|e| {
            e.price = i as f64;
        });
    }
}

Let me know if it works for you and if you have any questions! :-)

Kind regards, Nicholas

lluckydog commented 3 months ago

Dear Nicholas,

Thank you for releasing version 2.1.0 and sharing this example. It looks like the new functionality, which allows passing an initial state that is neither Send nor Sync to the Disruptor, fits my needs perfectly. I appreciate the detailed example you provided as well.

I'll test it out and let you know if I have any questions.

Kind regards, dandy

nicholassm commented 3 months ago

Hi @lluckydog,

I'll close this ticket now but please feel free to open a new if you have any questions. Thanks.

Kind regards, Nicholas