hortonworks / streamline

StreamLine - Streaming Analytics
Apache License 2.0
164 stars 96 forks source link

Apply weighted average to latency metrics #1226

Closed HeartSaVioR closed 6 years ago

HeartSaVioR commented 6 years ago

@arunmahadevan guided me to how Storm aggregates latencies, https://github.com/hortonworks/storm/blob/HDF-3.1-maint/storm-core/src/clj/org/apache/storm/stats.clj#L384

(defn product-or-0
  [& args]
  (apply apply-or-0 * args))

(defn- agg-bolt-lat-and-count
  "Aggregates number executed, process latency, and execute latency across all
  streams."
  [idk->exec-avg idk->proc-avg idk->num-executed]
  (letfn [(weight-avg [[id avg]]
            (let [num-e (get idk->num-executed id)]
              (product-or-0 avg num-e)))]
    {:executeLatencyTotal (sum (map weight-avg idk->exec-avg))
     :processLatencyTotal (sum (map weight-avg idk->proc-avg))
     :executed (sum (vals idk->num-executed))}))

(defn- agg-spout-lat-and-count
  "Aggregates number acked and complete latencies across all streams."
  [sid->comp-avg sid->num-acked]
  (letfn [(weight-avg [[id avg]]
            (product-or-0 avg (get sid->num-acked id)))]
    {:completeLatencyTotal (sum (map weight-avg sid->comp-avg))
     :acked (sum (vals sid->num-acked))}))

as converted to Java in master branch in StatsUtil.java: https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java

We could do similar way to make latencies closer with Storm UI.

HeartSaVioR commented 6 years ago

1228