nicholassm / disruptor-rs

Low latency inter-thread communication library in Rust inspired by the LMAX Disruptor.
MIT License
633 stars 18 forks source link

How to detect if the queue has/has not changed? #18

Open MoonMachine1 opened 1 week ago

MoonMachine1 commented 1 week ago

Great library here!

I have a use case where I need to monitor the queue/ringbuffer to see if values have changed.

Basically, I have a stream that publishes every 1s or so and processing is done within 5ms so I want to poll the queue/ringbuffer to see the values have changed, to alert downstream systems.

I see that the sequences is stored on the producer, checking if that value has changed also would work, but is not pub.

Curious to see what the best strategy for polling for changes in an outside process would be or if there is a supported strategy that I overlooked.

nicholassm commented 1 week ago

Hi @MoonMachine1,

You can get the sequence for the event being published as one of the arguments to the closure for event handling:

let processor = |e: &Event, sequence: Sequence, end_of_batch: bool| {
    // Your processing logic here. I.e. business logic.
};

let monitor = |e: &Event, sequence: Sequence, end_of_batch: bool| {
    // Your monitoring logic here.
};

let size = 64;
let mut producer = disruptor::build_single_producer(size, factory, BusySpin)
    .handle_events_with(processor)
    .and_then()
        .handle_events_with(monitor)
    .build();

The only way to "read" events is to register an event processor - like shown above. There's no way for another thread (outside the Disruptor) to see what happens on the ringbuffer. Note, that the monitor closure will only read the event after the processor closure is done. So this way you can measure the time it takes to process the event (assuming the producer publishes a timestamp of the publication time). That should achieve what you're after - as all event processor read all published events.

Did that answer your questions? Or did I misunderstand?

Kind regards, Nicholas

MoonMachine1 commented 5 days ago

Thank you @nicholassm!

That is a reasonable solution. I moved my monitoring logic out of the main handle into the secondary handle and it did improve throughput.

Maybe a feature request, is it possible to assign different WaitStrategy's to different handlers? I put several monitor threads on the same core which I am guessing has a similar side effect due to contention.

MoonMachine1 commented 5 days ago

Also side note for benchmarking, I just ported a tokio::spawn based version of my pipeline to disruptor and improved performance ~300-600ms on end-to-end completion time (which is great).

One thing I have noticed is that when it publish to a separate processor (from inside of the processor closure) there is 1 μs delay on picking up the first event, and then an additional ~10 μs of latency on subsequent events. ie event pick up by second processor (timed on arrival, with only tracing::info log): 1st ~1 μs, 2nd ~10 μs, 3rd ~20 μs.

I tested with batch_publish and publish and there is about the same amount of latency. Each thread is pinned to a different core.

I am new to rust, so there may be something in my code vs the library.

Example

struct EventData {
 some_Strings,
 some_u64s
}
...
    pub fn publish_events(&mut self, events: Vec<EventData>, id: u64, ts: QuantaInstant) {
        self.queue.batch_publish(events.len(), |iter| {
            let mut i = 0;

            for event in iter {
                event.data = events[i].clone();
                event.ts = ts;
                event.id = id;
                i += 1;
            }
        });
    }

    pub fn publish_event(&mut self, data: EventData, id: u64, ts: QuantaInstant) {
        self.queue.publish(|obj: &mut PumpFunSwapEventPayload| {
            obj.data = data;
            obj.ts = ts;
            obj.id = id;
        });
    }