cda-group / arcon

State-first Streaming Applications in Rust
https://cda-group.github.io/arcon/
Apache License 2.0
175 stars 17 forks source link

metrics refactoring #231

Closed Max-Meldrum closed 3 years ago

Max-Meldrum commented 3 years ago

Move metrics registration out of NodeMetrics/SourceMetrics

This should happen in the construction of SourceNode/Node (in new(...)).

The structs should only hold data structures used for metrics (e.g., Meter).

Change how we use metrics macro

Rather than combining strings each time, we can use the macro key/labels.

// old
register_histogram!(format!("{}_{}", node_name, "batch_execution_time"));
// new
register_histogram!(
  "batch_execution_time",
   Unit::Microseconds,
   "execution time per events batch",
   "node" => self.descriptor.clone(),
);

// old
histogram!(
  format!("{}_{}", &self.descriptor, "batch_execution_time"),
  elapsed.as_micros() as f64
);
// new
histogram!("batch_execution_time",  elapsed.as_micros() as f64, "node" => self.descriptor.clone());

// new counter
increment_counter!("watermark_counter", "node" => self.descriptor.clone());

Source throughput per batch

Currently, we are marking and reporting the Meter metric per Polled record. Change the process function to return ArconResult where usize is the counter variable indicating how many records were polled.

match self.process() {
#[cfg(not(feature = "metrics")]
Ok(_) => (),
#[cfg(feature = "metrics")]
Ok(polled_records) => {
   self.metrics.incoming_message_rate.mark_n(polled_records);
   gauge!("incoming_message_rate",  self.metrics.incoming_message_rate.one_min_rate(), "source" => self.descriptor().clone());
}
Err(error) => {
 // fatal error, must shutdown..
 // TODO: coordinate shutdown of the application..
 error!(self.logger, "{}", error);
}

Update HardwareMetricGroup

function names contain term gauge.

I think we can remove both of the functions if we are moving the registration to Node::new() and using the new macro approach.