asynchronics / asynchronix

High-performance asynchronous computation framework for system simulation
Apache License 2.0
178 stars 10 forks source link

Question about performance of looping inside self-scheduling functions #52

Closed fernandomaura97 closed 4 days ago

fernandomaura97 commented 4 days ago

Hi, I found this project recently and was looking to translate a C++ discrete event simulator into memory-safe rust via this library, and I eventually ran into some performance issues when adding functionality to my model. Self-scheduling functions are widely used in the original C++ project, so I expected them to work similarly in here in terms of performance.

The structure of my project is the following, replicating a M/M/1/K queueing system where I eventually added aggregation of MpduPacket into a single Ampdu block that can hold 64 packets.

                     ┌────────────────────────────────────────────────────┐
                     │                                                   │                  
                      │                    Packet Flow   
                     │   ┌──────────────┐                ┌──────────────┐ │
                           PoissonSource├───────────────►│ QueueModule  ├──► AmpduPacket
                     │   │              │    output_port │              │ │    output_port
                     │   └──────────────┘    (mpduPacket)└──────────────┘ │
                     │                                                    │
                     └────────────────────────────────────────────────────┘

The way the PoissonSource generates packets is by generating a first packet with exponentially distributed length and selecting the time of next generation according to a random distribution.

impl PoissonSource {
    /// ....
    fn send_packet<'a>(
        &'a mut self,
        _: (),
        context: &'a Context<Self>,
    ) -> impl Future<Output = ()> + Send + 'a {
        async move {
            let mut packet = MpduPacket::new();

            let mut time_interarrival =
                Duration::from_secs_f64(exponential(1.0 / self.arrival_rate));
            time_interarrival = max(time_interarrival, Duration::from_nanos(10));

            let len_random = exponential(self.mean_length_packets as f64) as usize;
            packet.length_packet = cmp::max(1, len_random);

            self.num_packets_sent += 1;
            packet.packet_id = self.num_packets_sent;
            self.output_port.send(packet.clone()).await;

            context
                .scheduler
                .schedule_event(time_interarrival, Self::send_packet, ())
                .unwrap();
        }
    }
}

impl Model for PoissonSource {}

The queue has an input function that schedules service and adds packets to the VecDeque<MpduPacket>. It also has a self-scheduling function to deque and schedule packets with time proportional to their length. This version has no aggregation and just sends the MpduPacket to the next block, which works great and fast (1s of real time = 100s in simtime):


#[derive(Clone)]
pub struct QueueModule {
    pub output_port: Output<AmpduPacket>,
    pub queue: VecDeque<MpduPacket>,
    pub queue_maxsize: usize,
    pub service_timer: Duration,
    pub aux_ampdu_serviced: AmpduPacket,
    pub packet_being_served: bool,
    pub blocked_packet_counter: usize,
    pub arrived_packet_counter: usize,
    pub queue_length_counter: usize,
    pub arrival_rate: f64,
    pub service_rate: f64,
    pub rate_departures_bps: f64,
    pub t0_time: Instant,
    pub csv_metrics: CsvType,
    pub coords_queue: Coords,
    pub p_tx: f64,
}

impl QueueModule {

    pub async fn input(&mut self, mut packet: MpduPacket, context: &Context<Self>) {
        self.arrived_packet_counter += 1;
        self.queue_length_counter += self.queue.len();

        if self.queue.len() < self.queue_maxsize {
            packet.queue_in_instant = context.scheduler.time();
            self.queue.push_back(packet);                           // enqueue packet

            if self.queue.len() == 1 && !self.packet_being_served {
                self.deque_schedule_service((), context).await;     // schedule deque, and service
            }
        } else {
            self.blocked_packet_counter += 1;   // full queue
        }
    }

    fn deque_schedule_service<'a>(
        &'a mut self,
        _: (),
        context: &'a Context<Self>,
    ) -> impl Future<Output = ()> + Send + 'a {
        async move {
            if self.packet_being_served == true {              // serve the packet waiting
                self.output_port.send(self.aux_packet_serviced).await;
                self.aux_packet_serviced = MpduPacket::new();
                self.packet_being_served = false;
            }

            if let Some(packet) = self.queue.pop_front() {     // deque packet for next iter
                let now: tai_time::TaiTime<0> = context.scheduler.time();

                let mut serviced_packet = packet.clone();
                serviced_packet.queue_out_instant = now;
                serviced_packet.T_q = now.duration_since(serviced_packet.queue_in_instant);

                let time_of_service_secs = Duration::from_secs_f64(
                    serviced_packet.length_packet as f64 / self.rate_departures_bps,
                );

                serviced_packet.expected_T_s = time_of_service_secs;

                self.packet_being_served = true;
                self.aux_packet_serviced = serviced_packet.clone();

                context
                    .scheduler
                    .schedule_event(time_of_service_secs, Self::deque_schedule_service, ())
                    .unwrap();
            }
        }
    }
}

Where I found some problems is when I wanted to add functionality to the QueueModule, in particular traversing the queue to put MpduPackets together into an AmpduPacket holding a max of 64 MpduPackets inside a Vec. This has a severe performance hit on the system (1s of real time = 1s in simtime), making it unfeasable to use this library for some scenarios I was doing in my C++ DES.

I tried separating the service into a separate function, and different ways of traversing the queue in order to improve performance without success on improving performance. I couldn't find a good reason for this, although I acknowledge I'm a begginer at async and might be blocking the program for some periods of time (f.e. when using println!) The problematic change is when I add the following functionality into the deque_schedule_service function, and make it deliver the AmpduPacket dequed in the last iteration:

pub struct AmpduPacket {
    pub mpdu_packets: Vec<MpduPacket>, // Container for MPDU packets
    pub total_length: usize,           // Total length of aggregated packets
    pub sta_id: i32,                   // ID for the source STA
    pub size: i32,
    pub coordinates: Coords,
}
impl QueueModule {

  fn deque_schedule_service<'a>(
      &'a mut self,
      _: (),
      context: &'a Context<Self>,
  ) -> impl Future<Output = ()> + Send + 'a {
      async move {
          if self.packet_being_served == true {
              self.output_port.send(self.aux_ampdu_serviced.clone()).await;
              self.aux_ampdu_serviced.reset();
              self.packet_being_served = false;
          }

          if let Some(first_packet) = self.queue.pop_front() {
              let mut first_packet_mut = first_packet.clone();
              let now: tai_time::TaiTime<0> = context.scheduler.time();
              self.aux_ampdu_serviced.sta_id = first_packet_mut.sta_dest_id;
              self.aux_ampdu_serviced.coordinates = first_packet_mut.sta_coords.clone();

              first_packet_mut.queue_out_instant = now;

              self.aux_ampdu_serviced.mpdu_packets.push(first_packet_mut);
              self.aux_ampdu_serviced.total_length += first_packet_mut.length_packet;
              self.aux_ampdu_serviced.size += 1;

              let mut packets_to_remove = Vec::new();
              let mut resulting_delays = frametransmission_delay(
                  first_packet_mut.length_packet as f64,
                  MAX_AMPDU_SIZE,
                  self.coords_queue,
                  first_packet_mut.sta_coords,
                  self.p_tx,
              );
              let mut service_duration = Duration::from_secs_f64(resulting_delays.service_delay);

              for (index, packet) in self.queue.iter().enumerate() {
                  if packet.sta_dest_id != self.aux_ampdu_serviced.sta_id {
                      continue;
                  }

                  resulting_delays = frametransmission_delay(
                      self.aux_ampdu_serviced.total_length as f64,
                      MAX_AMPDU_SIZE as i32,
                      self.coords_queue,
                      self.aux_ampdu_serviced.coordinates,
                      self.p_tx,
                  );

                  if resulting_delays.service_delay >= DEFAULT_TMAX_AGG
                      || self.aux_ampdu_serviced.size >= MAX_AMPDU_SIZE as i32
                  {
                      break;
                  }

                  packets_to_remove.push(index);
                  service_duration = Duration::from_secs_f64(resulting_delays.service_delay);
              }

              for &index in packets_to_remove.iter().rev() {
                  if let Some(mut packet) = self.queue.remove(index) {
                      packet.queue_out_instant = now;
                      self.aux_ampdu_serviced.mpdu_packets.push(packet);
                      self.aux_ampdu_serviced.total_length += packet.length_packet;
                      self.aux_ampdu_serviced.size += 1;
                  }
              }

              for packet in self.aux_ampdu_serviced.mpdu_packets.iter_mut() {
                  packet.expected_T_s = service_duration;
              }

              self.packet_being_served = true;
              context
                  .scheduler
                  .schedule_event(service_duration, Self::deque_schedule_service, ())
                  .unwrap();
          }
      }
   }
}

My main concerns at the moment are:

  1. Is there a more efficient way to implement looping inside a self-scheduling function without blocking the async runtime?
  2. Is there some specific pattern I should be aware of when implementing self-scheduling functions?
sbarral commented 4 days ago

There's a lot to unpack here and queeing theory is not my specialty so I may need a bit more time to fully understand where the bottleneck is.

However, I do have one immediate question that may help pinpoint the problem: if the simulated time remains the same, are there more scheduled calls in the second version of deque_schedule_service()? If not, then I think this is rather a problem of the implementation of the loops rather than anything to do with the scheduler. If yes, are you able to estimate (rough order of magnitude only) the number of scheduler calls per 1s simulated in both variants?

Here are some answers to your questions and some extra random pieces of information that may help:

jauhien commented 4 days ago

I see unnecessary clone calls (cloning should be avoided if you could just move the value), in particular the following seems suspicious for me:

self.output_port.send(self.aux_ampdu_serviced.clone()).await;
self.aux_ampdu_serviced.reset();

It seems that you are unnecessary cloning the structure containing a vector of other structures on every call.

Also you are allocating memory on every call: let mut packets_to_remove = Vec::new(); (and then deallocating it). For filtering I would use retain method (and closure with side effects to collect data).

This assignment service_duration = Duration::from_secs_f64(resulting_delays.service_delay); seems to be suspicious, shouldn't it have +=?

Anyway, it seems to me that the performance issue is not related to self-scheduling, to be sure I would execute the appropriate code outside of the simulator and check how much time it takes.

fernandomaura97 commented 4 days ago

Thanks for the early response! I tried tracing the code in different ways, and now by comparing both C++ and rust versions by adding simulation metrics, it seems I did an oopsie by misconfiguring the source, so it sent packets x1000 times more frequently than it should when compared to c++, definitely explaining the "performance decrease". Sorry about the code dump, as it is a bit convoluted and I deleted debug prints for a bit of brevity.

Right now it seems C++ and the asynchronix implementation have similar performance for same scenarios, so not complaining anymore :) Thank you so much for the feedback, I will take some points into account and re-check my code, I'm looking forward to build a nice DES for networking for my future work :D

sbarral commented 4 days ago

No problem, you are welcome :) Yes, it's worth looking as well into the potential issues highlighted by @jauhien (the = vs += does look suspicious to me too). Good luck with your project!