Closed pyr closed 8 years ago
Alternatively, we could use monoids
and finalizers. Most of the things are easily expressed as monoids
, especially ones that we have at the moment.
For example, sum
is a monoid that starts with 0 and sums each next item with itself. count
would be an starting with 0
too and append operation will just increment the value of count by 1. min
and max
are kind of working out of the box as well.
Where it gets a bit harier, which is mean
, we can still go with rolling mean: we calculate the rolling sum
and rolling count
, and here we'd need a finalizer, which would divide sum by count. This way it'd be possible to calculate everything in a constant space: you alwasys keep a single item in memory per aggregate which sets you free from the need to create rules for cascading ingestion. Moreover, mean of means would not be a correct mean. The rest of metrics should of course work.
For mor sophisticated cases, if we ever want to calculate the quartiles, there's P^2 algorithm: http://www.cs.wustl.edu/~jain/papers/ftp/psqr.pdf Also, there's another interesting paper on the subject: http://www.vldb.org/pvldb/vol8/p702-tangwongsan.pdf
I think it's an expected trade-off to lose precision for higher resolutions and it is already the case for RRD, Whisper and friends. So having a mean of means is a compromise we can afford.
But I agree on moving to constant space, for mean, augment-snap
was already built following this logic
I havent realized, current implementation is exactly what I have described, which is great!
agument-snap
is using exactly that logic, you're absolutely right here. The only difference is that all the items for the snapshot are still kept in memory until the snapshot is flushed and augment-snap
is only called when the old
snapshot is yielded by the grouping function. We could improve it even further and augment-snap, only keeping the first window item timestamp and the intermediate state.
Yep, that's what i'm in the process of doing, it makes no sense to keep the window when we can augment in constant time. I'm also considering a different change, I'm thinking of having a separate snapshot thread, which would walk the list of buckets held in memory at every resolution interval which would produce a single coherent payload of snaps to send over to cassandra. This would make the workload on cassandra much more simple to deal with.
This is what I'm headed for for keeping values constant-time:
(defrecord MetricMonoid [count minv maxv sum]
MetricWriter
(augment [this val]
(assert (not (nil? val)))
(MetricMonoid. (inc count)
(if minv (min minv val) val)
(if maxv (max maxv val) val)
(+ sum val)))
(snapshot [this]
(MetricSnapshot. (double (/ sum count)) minv maxv sum)))
Wow <3 looks amazing!
The remaining question will be (and would love your ideas on the subject @ifesdjeen): how do we guarantee safe access to the monoids.
This ties-in to the guarantees that disruptor can provide. For instance, if we knew that disruptor callbacks were always called on a single thread and did not overlap, we could make the monoids volatile
and vswap!
to apply augment. The alternative is to use an atom, which means going through the expensive CAS mechanism for every monoid augment call.
Regardless of the choice there, we would end up with the following structure:
Meric
containing only a resolution chain: (defrecord Metric [resolutions])
:resolutions
would be a list of (defrecord MetricResolutions [precision period slots])
where :slots
would be a hash-map of timeslot to metric monoid.Propagating a new metric to the engine would mean looking up the exec-plan for the metric name, fetching or creating it in the global nbhm, then for each resolution, fetch or create the appropriate time slot to augment it with the new metric value.
If it doesn't prove to be too memory hungry we could devise an initial plan where:
org.cliffc.high_scale_lib.NonBlockingHashMap
instance.org.cliffc.high_scale_lib.NonBlockingHashMap
instance.If we can make sure that access to metric monoids are thread isolated, we could move on to using volatiles for the monoids.
re: disruptor callbacks, since we're using it for the fast computation, I think the best is to avoid context switches on the :ingestq
and have just one thread (I'm not sure if it's possible with Disruptor DSL, but we can always opt-out for using RingBuffer directly there).
If we decide that we need more parallelism, we can use multiple ring buffers each of which will guarantee it's own local order, and partition across them. This way we can use volatile
since each ring buffer only has access to it's own data segment..
Other than that - your description and idea sounds very very good.
OK, thanks. to be very clear, what situation are we in right now? Does it just boil down to creating a disruptor with a threadpool of size 1 ? https://github.com/pyr/cyanite/blob/master/src/io/cyanite/engine/queue.clj#L37
I like the idea of sharding on several ringbuffers by the way, down the line. I'll start with a simple mechanism that uses nonblocking hashmaps and atoms, and we'll see how this performs, it will be easy to optomize hotspots from there.
Yes, threadpool of 1... Or writing a custom consumer that spins in it's own thread without using the Disruptor DSL.
I like the idea of sharding on several ringbuffers by the way, down the line.
Perfect!
And to close-off the initial discussion, this speaks against spilling over from resolution to resolution, but rather in favor of creating a monoid per metric/resolution tuple, as @ifesdjeen mentioned in the initial reply. I'll open a PR that builds on this discussion (hopefully shortly).
Instead they should rely on snapshots of smaller resolutions. This means that we need to ensure that a resolution chain for is always composed of multiples of smaller resolutions. When writing we can then use a single
ingest
function instead ofingest-at-resolution
which will get the list of snapshots, it will be up to ingest to propagate a smaller resolution snapshot to a larger one.To keep code as readable as possible, volatiles should be tried out.