influxdata / influxdb

Scalable datastore for metrics, events, and real-time analytics
https://influxdata.com
Apache License 2.0
28.86k stars 3.55k forks source link

Add intelligent rollups #6910

Closed pauldix closed 8 years ago

pauldix commented 8 years ago

There is a need to specify rollup rules for everything, or almost everything in the database. These should also be tied in with the query language so that queries can use whatever downsample makes sense based on the time range and level of precision required. The largest rollup interval will be 1h.

Useful information about requirements is in a comment below

API

The two sides of the API are the queries to create and show the downsampling rules, and the extra syntax to specify that a query should automatically scale to lower precision data if needed.

Working with Rollups

For the downsampling rules, you'd specify a matcher, the functions (and optionally the new field names), and the downsampling periods (each tied to a retention policy). Here's an idea for what that might look like.

CREATE ROLLUP "lagged_data" ON "mydb"
measurements("field_sensors")
fields(*)
functions(mean, count)
periods(5m, "5m_rollup_retention_policy", 1h, "1h_rollup_retention_policy)
recompute(20m)

CREATE ROLLUP "foo" ON "mydb"
measurements("foo", "bar")
fields(*)
functions(mean, count, max, percentile(90) as perc_90, percentile(99) as perc_99)
periods(5m, "5m_rollup_retention_policy", 1h, "1h_rollup_retention_policy)

CREATE ROLLUP "everything" ON "mydb"
measurements(*)
fields(*)
functions(mean, count, max)
periods(5m, "5m_rollup_retention_policy", 1h, "1h_rollup_retention_policy)

SHOW ROLLUPS
DROP ROLLUP "foo"
BACKFILL "foo" FROM "2015-01-01" TO "2016-06-23" THROTTLE 10s

The measurements and fields functions should be able to take *, strings, or regexes. The number of arguments in periods will always be a factor of 2 with the first argument being a duration and the second argument being the retention policy that will get written into. The recompute function tells the system to keep the current interval around and do a final compute of the rollup after that amount of time has passed.

Backfill will backfill a specific rollup from a start time to and end time. The query should return immediately and should run in the background on the server. The optional THROTTLE will pause between backfilling measurements to avoid overloading.

The TO is optional. If not specified, backfill would go all the way up to now. Time should be in RFC3339 format.

Querying Rollups

And the query syntax for specifying autoscaling to different rollups based on time range:

SELECT mean(count) FROM "foo"
WHERE time > now() - 30d
SCALE(mean, mean)

Scale would take an argument that specifies which field name from the select clause to scale and the rollup value to use if it needs it. The query should either compute using the raw values or compute using whichever rollup is appropriate for that period of time range. When to scale could get tricky so we may want to have other arguments to the scale function to specify it more granularly.

The query should return a warning in the query results if the scale logic specified that it should use a rollup, but an appropriate one couldn't be found and it had to compute off the raw results.

If the start time of the query is before the end of the raw values retention policy, an error should be returned that it can't properly scale if no rollup exists.

Implementation

These rollups should be done all the time on data getting written in that is in the current interval. This will have to work for all series in the database so it'll probably have to be kept hot in memory and updated on every write. This will obviously reduce performance since there's additional overhead, but I think it's better than the alternative of trying to query all data every 5m or whatever the rollup period is.

When a write comes in it should check against the rollup rules in order and use the first one that matches the measurement name. The rollups should take all tag values along with them. The naming convention for rollup values should be <field_name>_<rollup function or AS name>

So if you had a field named used_bytes on measurement hd and you were rolling up all fields with max, you'd have fields called used_bytes_max. Ultimately, the idea is that you'd have each different precision of rollup stored in a separate retention policy. So the measurement name and tags would remain the same.

The field names would be different, but that would be handled automatically for the user by the SCALE option. They could always query the lower precision data directly by asking for that field name from the specific retention policy.

If the user does a restart of the database, they'll need to do a backfill query if they want that period's rollups to be accurate.

Useful information about requirements is in a comment below

pauldix commented 8 years ago

Just thought of another thing. For rollups like the one that downsamples everything, it should be intelligent about applying it to different field types. For instance, only count would make sense on string fields.

torkelo commented 8 years ago

Looks good! Like the plan to have InfluxDB handle the rollup and field selection using the Scale query keyword

jsternberg commented 8 years ago

I'm not sure I'm a fan of the syntax, but I am a fan of the idea. I'd like to see if it's possible to flesh this out in terms of existing features first (if possible) rather than adding more syntax and another subsystem.

It seems like there are a few core components that are part of implementing the meat of this:

  1. Running queries to perform the aggregation into a different retention policy.
  2. Automatically switching which underlying retention policy is used based on the time interval selected.

The first seems like it is very similar to continuous queries and I would like to consider improving continuous queries so they can provide this function maybe with some convenience syntax so there aren't as many moving parts. This one:

CREATE ROLLUP "lagged_data" ON "mydb"
measurements("field_sensors")
fields(*)
functions(mean, count)
periods(5m, "5m_rollup_retention_policy", 1h, "1h_rollup_retention_policy)
recompute(20m)

Can be captured by this:

CREATE CONTINUOUS QUERY "lagged_data" ON "mydb"
BEGIN
    SELECT mean(*), count(*) INTO "mydb"."5m_rollup_retention_policy".:MEASUREMENT FROM "field_sensors" GROUP BY time(5m)
    SELECT mean(*), count(*) INTO "mydb"."1h_rollup_retention_policy".:MEASUREMENT FROM "field_sensors" GROUP BY time(1h)
END

A bit of new syntax in there. We have wildcards within functions which was requested in #5750. We include two select statements inside of the continuous query instead of one (although maybe I should add a semi-colon).

The part this is missing is a way for automatically selecting the correct retention policy. I think we should potentially add that to the retention policies themselves. Something like:

CREATE RETENTION POLICY ALIAS "myrp" ON "mydb"
    USE "5m_rollup_retention_policy" WITH PRECISION 5m
    USE "1h_rollup_retention_policy" WITH PRECISION 1h

Then, if the user uses this retention policy alias, we automatically check what group by time they use and shard durations to select a proper retention policy for the query during the rewrite phase. A little crudely thought out so far, but thoughts?

pauldix commented 8 years ago

@jsternberg The current implementation of CQs won't work at this scale since they just run periodically. Only something that is constantly running on all currently hot data would work (I think).

If we change this behavior it will change how CQs work because of the syntax we have in there about running multiple times during the interval and potentially running many times after. I think it's too late to propose making that drastic of a change to CQs before 1.0 and I don't thinking making the change afterwards would change their fundamental behavior.

The modified CQ syntax you propose is much more verbose so it feels painful to specify for multiple rollup periods. I think it's common for people to have 2-3 levels of precision other than the raw data.

The other thing is that the precision should be tied to the actual rollup rule, not the retention policy. A retention policy can have any level precision data in it. We don't limit that so it seems odd to tie a precision to one explicitly. The only thing the retention policy specifies is when data is dropped. Having it defined with the rule ensures that we have everything we need to do the scale mapping later. Where if you have a two step process to define the rollup and the scale, you introduce a greater chance for user error.

I hear you about not modifying CQs so we avoid creating a new subsystem and syntax, but I'd prefer introducing something that is easier to work with and makes sense for this particular use rather than trying to shoehorn it into CQs later. If anything I'd evaluate the new syntax as a replacement for CQs.

Although we won't be able to drop CQ support for well over a year since it's in the 1.0 release line.

ryantxu commented 8 years ago

Perhaps this is a different issue, but seems related to making queries "just work" at a wide range of time scales -- in particular in grafana.

Right now, if you set a query with 'mean(value)' and the $interval gets below the sample rate the graphs disappear. Although strictly correct, it is a PITA and requires different query configs for different scales. Maybe the scale(mean,mean) could be smart and return the original points rather than nulls.

The largest rollup interval will be 1h

Why not 1d? or even 1w? these don't involve the calendar and are useful at mutli-year scale

pauldix commented 8 years ago

@ryantxu you're exactly right that making queries "just work" (particularly in Grafana) is one of the primary things this approach is trying to address.

I specified 1h as the max because it's the max interval we can have without worrying about time zones. 1d or even 1w can be computed on the fly (I hope) given the speed of the db. And upcoming features give users the ability to change their time zone for larger group by intervals dynamically at query time.

Forcing 1h maxes would make these downsamples still work with that kind of on-the-fly time zone shifting.

jsternberg commented 8 years ago

@pauldix ok, that clarifies a lot for me. So this will be a separate subsystem from continuous queries and will eventually act as a replacement once we hit 2.0. Can you add a summary section to the top of the ticket with bullet points to help parse out all of the relevant information? There were a few other features you just mentioned that I didn't realize were part of this proposal.

The current implementation of CQs won't work at this scale since they just run periodically. Only something that is constantly running on all currently hot data would work (I think).

Does this mean we are going to trigger a rollup to occur when data is written to an interval rather than wait for the CQ to run? How are we planning for this to function?

hbs commented 8 years ago

Automatic rollups are a call for trouble, they are the best way to trigger write amplification. The early experience of blueflood is to be studied carefully (https://developer.rackspace.com/blog/blueflood-announcement/).

Out of order data will kill any automatic rollup setup.

Also rollups are usually not what people expect. If I want the avg of a given metric on 5 minutes interval, I mean 5 minutes interval from NOW, not 5 minutes interval falling on round boundaries otherwise the latest returned value is rarely a 5 minute avg.

Why not make sure data access is performant so moving window computations can be done on the fly?

pauldix commented 8 years ago

@jsternberg I was thinking we'd only aggregate for the current intervals based on the bucket of time. Anything written in on a lag or for historical data would have to use backfill to calculate those correctly. Once a window of time is done + the lag interval for flushing, it should output the aggregate point.

The scale should pick up from this cached set of values for the current window when it outputs query results.

pauldix commented 8 years ago

@hbs This design isn't for out of order or very lagged data. That's what CQs or calculating on the fly from raw data is for. As for write amplification, the design isn't to output a point into the downsample for every single point coming in, only when the window has passed. So some extra writes will be incurred, but not many for every single rollup.

As for computing quickly on raw data, we already do that. This is for people that need to decimate their data to save on disk space for areas of time where they don't need high precision. This also isn't for moving window comparisons, which you can already do today on raw data inside Influx.

Since you make a habit of pointing people on Twitter talking about InfluxDB to your database instead, maybe you can share what you've decided to implement?

hbs commented 8 years ago

Sure, we do not do automatic rollups for all the problems I've mentioned.

As for saving disk space the most common operation is tiered storage where we create files from 'hot' data, retaining the ability to perform computations on those files. From there you can either chose to perform downsampling in batch, re-inject that as 'hot' data and discard the finest grade data, or simply work on the 'cold' data on file instead of making a downsampled version hot.

dennisjac commented 8 years ago

Since I've already implemented a very similar solution using CQs and a small Grafana patch in grafana/grafana#4262 I'm really happy to see this addressed directly in InfluxDB.

I'm not sure I understand how these rollups relate to regular retention policies though. When I specify for example periods(5m, "5m_rollup_retention_policy", 1h, "1h_rollup_retention_policy) but I send data in 10s intervals does that mean that only the aggregated data is stored for the 5m and 1h periods and the 10s values are never stored?

In my (naive) implementation I store the raw 10s value in the default retention policy and then use wildcard CQs with back referencing measurement names to feed the aggregated data into two additional retention policies for the 5m and 1h values. This way the aggregated values also run into the "bucket problem" (i.e. they are generated with a 5m or 1h delay) but the live data which i am interested in 98% of the time is always up to date as no aggregation is used/required.

beckettsean commented 8 years ago

If we change this behavior it will change how CQs work because of the syntax we have in there about running multiple times during the interval and potentially running many times after.

I don't follow this. To me this feels like a special case implementation of CQs, not a wholesale rewrite of how they work. The RESAMPLE EVERY...FOR... syntax isn't particularly useful for the auto-rollup use case, but why not restrict rollup queries to a subset of the CQ syntax? I'm sure I'm missing something but I'm not sure what.

The modified CQ syntax you propose is much more verbose so it feels painful to specify for multiple rollup periods. I think it's common for people to have 2-3 levels of precision other than the raw data.

These are not queries that are entered daily by a person, so efficiency of input is a minor concern to me. They are done as part of system setup and periodic maintenance, perhaps once a quarter or so. I think verbosity is a good thing when it leads to clarity. We've chosen SQL as the inspiration for InfluxQL, so I think keeping with the CQ syntax is preferable to the more functional-style syntax proposed. The proposed syntax is a marked departure from our existing systems and doesn't echo other syntax with which I'm familiar. It is more concise and possibly more extensible, but it is also more opaque and more bespoke. If ease of use continues to be a goal then we should not abandon SQL-ish syntax lightly. Millions more know SQL than any given development language or pattern.

If the user does a restart of the database, they'll need to do a backfill query if they want that period's rollups to be accurate.

This basically guarantees the database will violate user expectations after a restart. The system will not be query-able by configured dashboards until rollups are manually reapplied. Trading predictability for convenience is not what a database should do, in my opinion, particularly not one tied to real-time analytics and monitoring use cases. Predictability should be the number one behavior.

Does this mean that rollups aren't persisted to disk? Or that rollups only persist to disk periodically? If so, why not augment CQs with an in-RAM live representation that's flushed periodically, like querying points in the WAL now? It maintains the current behavior for CQs and prioritizes them for improvement, rather than abandoning them for a new, untested, syntactically unique system. We've altered so much in the database in the 0.x line, I am hesitant to replace whole subsystems and would prefer to extend them.

Nit: DROP ROLLUP "foo" should be DROP ROLLUP "foo" ON mydb.

pauldix commented 8 years ago

@beckettsean just a quick comment on why we'd just do it in memory and have the user explicitly backfill if they do a restart.

This is a feature for aggregated data. Basically, summaries. So they're inaccurate by default to begin with. The goal of this feature is to be able to do it in a performant fashion, which is more important than achieving absolute correctness.

To force correctness at all times, we'd have to read all the data on startup for whatever the last interval was (up to an hour) which could kill startup time (and we'd be dropping samples during the time we're recalculating the previous samples).

With the backfill method, we give the operator the chance to do whatever they deem necessary for their use case. So if they don't care about a single summary sample being off, they don't need to backfill and then won't take a performance hit.

At least that was my thinking about it, but I'm totally open for argument :)

beckettsean commented 8 years ago

@pauldix I think my confusion is that I don't quite understand if this proposal is to replace CQs or is intended to live alongside CQs. If it's purely an in-RAM solution that lives alongside CQs, doesn't this replicate a lot of what Kapacitor does? Perhaps we should look at making Kapacitor respond to queries or output dashboard data to Grafana, et al. instead of implementing a very similar stream processing functionality into InfluxDB.

daviesalex commented 8 years ago

Some comments as a user (not a developer, so please take my views with more salt than the others in this thread):

@beckettsean While I understand totally your POV, I do disagree that "efficiency of input is a minor concern" (at least, speaking for me!). While I understand how you feel that way, in my environment we have a constantly changing set of metrics coming in, and its changing daily - and likely will continue to (the power if InfluxDB is I can hand it out to hundreds of different folks that need a time series DB, and they can use it without talking to me). We need ways to trivially achieve this downsampling for InfluxDB to be useful for this type of use.

To address the comments from @hbs that point out that storage tiering is one way to achieve a similar thing, this solves the cost problem, but thats no use. Even with phenomenal hardware its simply too slow to draw a graph over a month in Grafana using data every 10s or 1; the amount of CPU time InfluxDB requires and number of IO Pages that must be retrieved from that storage is very high. We did testing with top end hardware (all the data on PCIe flash, etc.) and real world queries (such as from telegraf) was practically ungraphable over six months with query-time bucketing; queries were taking many, many minutes to render. For reasons mentioned above, creating a CQ for every field is a prohibitive PITA, and not being able to automatically choose the resulting field at query time with Grafana is at total show stopper for us (users can technically edit the data source for each query, but in our case that alone would pretty much guarantee that our users will use something else to solve this problem).

Speaking again selfishly as a user, we appreciate that we could achieve this with Kapacitor. we could also do this ourselves by putting something in front of InfluxDB that writes each point a few times. We really dont want to have to deal with another piece of software here, but if we did Kapacitor would be huge overkil for this. This feels like a very normal need for users of InfluxDB and as more and more users have more and more data (i.e. are more likely to try to plot it over months/years), this is going to become more and more of a feature for them.

It seems that the concerns here mostly rely on whether this replaces or is an addition to CQs (and a desire to avoid confusing/duplicating). To my non-expert mind, if CQs could add the following features, we could reuse much of the existing work:

As @pauldix says, a known limitation of this feature would be that if you server was down during a time you would have to manually re-downsample (a method needs to be provided to do that). Same applies if you send out-of-order data. This is designed for massive amounts of data that are sent at the time they are generated (a common case, IMHO).

Does this feel like something we can solve within influxDB rather than shoving something in front of it on the way in (or taking it in, selecting it out, downsampling it, and writing it back to the same InfluxDB in downsampled form)? The idea of putting Kapacitor in the middle (Grafana -> Kapacitor -> InfluxDB) isnt enough becasue one-off query performance will still be terrible; the data has to be downsampled outside of the query path).

phemmer commented 8 years ago

If the user does a restart of the database, they'll need to do a backfill query if they want that period's rollups to be accurate.

As a user, this also highly bothers me. I don't want to have to perform manual tasks to ensure we don't incur data loss every time we restart the process (whether due to crash, service restart, server restart, anything).

These rollups should be done all the time on data getting written in that is in the current interval. This will have to work for all series in the database so it'll probably have to be kept hot in memory and updated on every write. This will obviously reduce performance since there's additional overhead, but I think it's better than the alternative of trying to query all data every 5m or whatever the rollup period is.

In regards to the above, in your example you have:

periods(5m, "5m_rollup_retention_policy", 1h, "1h_rollup_retention_policy)
recompute(20m)

For it to work as you describe, you'd have to keep 20 minutes of data around, not 5. If the point of keeping this data in memory is to avoid the overhead of having to read it from disk, this would imply that a disk read is going to be expensive, which then implies lots of data. Meaning that keeping 20 minutes of data in memory is going to suck up a ton of memory. In my own humble opinion, influxdb should leave this up to the OS & the user. The OS does filesystem page caching to help with stuff like this. let it do its job. If we leave this up to the OS, and the OS determines that the system doesn't have enough memory to cache the 20min worth of data, then it'll drop the cache, and the rollup will be slow. If we tried to keep the cache within influxdb, influxdb isn't going to be aware of memory pressure, and it's going to keep this data cached no matter what, which could bring down the system due to OOM conditions. And if we instead mmap() this data in, then it helps the kernel prioritize the page cache, and prevents the data from going to swap. In any case, I think a slow rollup is preferable to processes getting killed or crashing. And if the rollup is slow due to cache getting dropped, then it's the user's responsibility to either make the rollup more frequent so that less cache is needed, or increase the memory on the system.

And if we're not going to keep the data in process memory, then this makes it easier for us to offset the rollup's time window. We can now do a rollup from previous_rollup_end_time to previous_rollup_end_time + 5m, where an offset would cause this to operate on now()-25m to now()-20m. If influxdb is restarted, previous_rollup_end_time doesn't change, and we read the data from disk, or the page cache if still present, after perhaps a short delay to let remote agents flush their data.

The query should return a warning in the query results if the scale logic specified that it should use a rollup, but an appropriate one couldn't be found and it had to compute off the raw results.

With an offset rollup, we would also ditch this warning. If we only do the rollup once (at an offset time), the query would use the rollup, and any "raw data" newer than previous_rollup_end_time.

dennisjac commented 8 years ago

I may be wrong about this but if the functions that can be used are restricted to e.g. mean, min, max, count and maybe some others then the values can be calculated on the fly and you don't have to keep the last 20 minutes of data around but only one or two values per period (like e.g. the sum of values received so far and the number of them in the "mean" case) which sounds like it would make things much easier to deal with?

Is this functionality really supposed to be super generic or would it be sufficient to just have it be able to aggregate the data for longer term data retention which is probably what 98% of people are looking for?

lpalm commented 8 years ago

Some notes on exponential hierarchical bucketing as food for thought:

Assume we rollup series over multiple scales over where each rollup interval is exactly twice as wide as the previous one. So if raw series S0 is rolled up into S1 at 1ms buckets, S2 will be rolled up at 2ms, S3 at 4ms, 8ms, etc. Each bucket contains a count of points, the min, max and sum of all values (mean can be computed easily from sum/count).

screen shot 2016-07-15 at 15 48 14

The number of rollup levels for any series with this functionality enabled can be automatically computed as a fraction of the time delta between the newest and oldest point in that series. In other words, create the next higher rollup level if it would have at least K points. This will involve reading at most 2K points from the level below. The retention policy's duration for each level can scale proportionally to bucket size to ensure a fixed maximum number of buckets per aggregation level.

Insertion Performance If inserting in order, we only need to keep the latest bucket from each rollup level in memory. Each data point will be added to exactly one bucket at the lowest aggregation level (1ms in this example). We create a new 1ms bucket for the first point inserted for that bucket. Once each bucket is "completed" at the end of its time range, it has a 50% chance of creating a new bucket in the hierarchy and a 50% chance of being added to the next bucket in the hierarchy that's halfway full, completing it. If we're just adding the first 1ms bucket to a new 2ms bucket, processing stops there. If we're adding the second 1ms bucket to an existing 2ms bucket, the 2ms bucket is completed. Since an update/insert at the next aggregation level up is 50% less likely than the one below, this is also amortized constant (1 + 1/2 + 1/4 + 1/8 + ... <= 2 inserts), even if we have a very large number of aggregation levels.

Here's an illustration. Red buckets are being updated in the current time step. Green are cold as they have already been written. White do not yet exist.

screen shot 2016-07-15 at 15 22 03

Past a few levels, (say S20 with bucket sizes of ~9 minutes), we no longer need to keep those buckets hot in memory since changes to them would be quite infrequent if inserting in order.

Storage Requirements The same 1+1/2+1/4+1/8+... ~= 2 logic applies for storage space required, as S(N+1) will have about half as many points as SN.

Query Performance - Multiple Points By using the "scale" query, the highest rollup level that provides the minimum amount of points is chosen for the query time interval. If we need 500 points to plot a graph, we'll fetch at most 999 buckets, regardless if we're querying for a 2 second window or 10 year window. The rollup series to query for a time interval of T millis and K required points is SN where N = floor(log2(T/K)) + 1 E.g. for a 10 hour window and at least 500 points desired, we'll query S17, with buckets 2^16ms ~ =64 seconds wide, and we'll return 550 points.

Query Performance - Aggregates We should also be able to compute precise aggregates (min/max/mean/count/avg) over arbitrarily large intervals by querying at multiple scales. At most 2 buckets per scale level need to be looked at. So to compute the exact count of points over a ~1 month period with 1ms precision, we'd need to go as far up as S32 (2^31ms ~= 24 days), meaning at most 64 buckets need to be looked at:

screen shot 2016-07-15 at 15 47 04

So aggregates over a time period of size T can be computed in O(log(T)).

Kapacitor Pseudo-Code Here's one way to generate similar rollups with existing functionality. We'd need a large tickscript to compute each basic aggregation for each rollup scale. // 'X' denotes one of sum, min, max, count. For count we'd have to sum the counts, not just take the count. var bucket1msX = batch.query('select X(value) from rawStream').period(1ms).every(1ms).influxDBOut().database(...).measurement('bucket1msX') // S1 var bucket2msX = batch.query('select X(value) from bucket1msX').period(2ms).every(2ms).influxDBOut().database(...).measurement('bucket2msX') // S2 var bucket4msX = batch.query('select X(value) from bucket2msX').period(4ms).every(4ms).influxDBOut().database(...).measurement('bucket4msX') // S3 /// .... and so on.

The main downside of this approach is that it wouldn't work well for out of order points. However, rolling up from raw data after the fact could be pretty efficient if the raw data is processed in order.

jsternberg commented 8 years ago

Looking back at this, I'm still a bit confused about the purpose of this feature and I think it would clarify a lot if we simplified this down to purpose and requirements before talking about potential solutions (such as in-memory or not and hierarchical buckets). What problem are we having that cannot be solved right now, why do we need to solve that problem, and then a critical examination of the existing solutions and why they don't fulfill the need.

The big issue that I've seen so far is being able to downsample data for a long term retention policy without having to rewrite a query and I don't see that addressed with this RFC. This may not be the RFC for that feature, but some clarification would definitely help me.

phemmer commented 8 years ago

I think the first 2 sentences of the description sum up why this proposal exists.

The big issue that I've seen so far is being able to downsample data for a long term retention policy without having to rewrite a query and I don't see that addressed with this RFC

It is:

Scale would take an argument that specifies which field name from the select clause to scale and the rollup value to use if it needs it. The query should either compute using the raw values or compute using whichever rollup is appropriate for that period of time range

jsternberg commented 8 years ago

That more describes semantics rather than the intention behind the RFC. The reason why I think intention is important is because then we can brainstorm about what solution would be best to achieve the desired intention.

I may be mistaken on the point of this RFC though. My initial reading was I thought this would be for aggregating queries for other retention policies and for handling the problem that I mentioned where people need to switch the retention policy when switching which precision they are looking at (which is unfamiliar for former graphite users). There was also talk about potentially deprecating continuous queries in favor of rollups, but I'm still not clear on the purpose behind the rollup.

To my best understanding, rollups, as designed here, are intended to be for optimization so someone can create a hot cache of specific aggregations so those only have to be calculated once rather than multiple times. If the system can't find that cache, it would go to the file system instead and perform the aggregation. Instead of backfilling, we could just mark shards as dirty and recalculate rollups on demand rather than forcing administrators to do it. If this is the case, then speed is very important and this isn't for long term aggregation.

lswith commented 8 years ago

I think we should simply create a new issue which addresses the problem of long term aggregation. Currently there doesn't seem to be a clear issue explaining the problem in detail. From my understanding, it would be something along the lines of:

This has a number of problems:

On Wed, 10 Aug 2016 at 07:37 Jonathan A. Sternberg notifications@github.com wrote:

That more describes semantics rather than the intention behind the RFC. The reason why I think intention is important is because then we can brainstorm about what solution would be best to achieve the desired intention.

I may be mistaken on the point of this RFC though. My initial reading was I thought this would be for aggregating queries for other retention policies and for handling the problem that I mentioned where people need to switch the retention policy when switching which precision they are looking at (which is unfamiliar for former graphite users). There was also talk about potentially deprecating continuous queries in favor of rollups, but I'm still not clear on the purpose behind the rollup.

To my best understanding, rollups, as designed here, are intended to be for optimization so someone can create a hot cache of specific aggregations so those only have to be calculated once rather than multiple times. If the system can't find that cache, it would go to the file system instead and perform the aggregation. Instead of backfilling, we could just mark shards as dirty and recalculate rollups on demand rather than forcing administrators to do it. If this is the case, then speed is very important and this isn't for long term aggregation.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/influxdata/influxdb/issues/6910#issuecomment-238699734, or mute the thread https://github.com/notifications/unsubscribe-auth/AA-K-9YpQw6wkoO5qCOBPJQigjVm1B9dks5qePMYgaJpZM4I-LHj .

pauldix commented 8 years ago

I don't think there's a need to create a new issue yet. I'll make an attempt to describe the high level requirements here and link to this comment in the top. I'll also try to separate syntax from implementation. I'll avoid the latter for now other than to say, the in-memory thing I had suggested was only a performance optimization idea.

First, the creation of this has nothing to do with CQs, other than to try to achieve a goal that I had originally intended when I was designing the first version of CQs. I'm not proposing we replace them or deprecate them. We can figure that out later, but I think there are current users that get value out of having CQs.

The only reason I brought CQs up is because I originally introduced them as a concept to try to solve the high level requirements I'm about to describe. Based on what I've heard from user feedback, they have the following issues:

Requirements

  1. Users should be able to quickly query aggregates on many series at the same time in the month or year range even if their raw sampling interval is 1s
  2. Users should be able to specify a time range and optionally a minimum number of desired samples and have the database return a precision level appropriate
  3. Users should be able to have older raw samples automatically removed without losing lower precision rolled up samples for longer term trends
  4. Users should be able to query lower precision samples and have the current sample show partial results (i.e. if they're looking at 1d samples, there should be a result for the current day)
  5. If automatically rolling up data, it should prioritize performance. It should not put extraordinary strain against disk. For example, various schemes could end up creating as much as 10x more writes against disk than the raw samples coming in. I'm up for debate, but I'd favor a memory + disk combo that would require some recalculating of samples after a restart or outage.
  6. Operators should be able to specify rules for rolling up and decimating data that is not overly verbose and doesn't require updates when new series or measurements are written into the database. They should be able to set once and forget, regardless of what their users are writing

Note that there's no requirement to support the full range of query functions and functionality that InfluxQL provides. This is for the case of being able to query for longer term trends quickly (mostly for visualization purposes) and to have raw data evicted periodically while keeping longer term data.

Syntax

Even with all the debate I think the syntax would largely be unchanged. If we take an approach like @lpalm suggested, then the creation of the rollup rules would end up being much simpler because we would already chose what lower precision data to keep.

The only other thing I would add would be an argument to SCALE that would let the user specify the minimum or maximum number of data points that should get returned by a query. The implementation for that will have to figure out later but it should work when querying raw samples or if returning lower precision samples.

Thoughts?

jsternberg commented 8 years ago

I think this is a good writeup. One thing that I would like to mention though in regards to this:

They simply don't scale. If you have more than a few thousand series you're downsampling, they fall over.

Is this still true? A big problem with older CQs was we would create a new query for every single interval. We've now started batching those and I think they have become much faster. If this is the case and CQs can meet the performance requirements now, we may want to think about building on that section of the code.

I would also personally like most of this to be automatic. I think #7009 has made creating CQs much easier since now you can do this:

CREATE CONTINUOUS QUERY aggregates ON <db> BEGIN
    SELECT mean(*), max(*), min(*) INTO <db>."5m".:MEASUREMENT FROM /.*/ GROUP BY time(5m), *
END

That will aggregate all measurements with mean, max, and min for every measurement in a database. I think there are still some parts we may want to consider simplifying for this use case, but this is much better than it previously was. An example of output:

> select * from mydb."5m".cpu
name: cpu
---------
time                    host            max_value       mean_value      min_value
1470844200000000000     server01        2               2               2

In terms of implementation, I would like to experiment with the idea of making the retention policy switch automatically based on what continuous queries exist. So when we are doing query planning, we would see the continuous query existed and switch to using that if it fit the correct group by interval and matched the interval in question. I think we can make much of the syntax in the original proposal irrelevant since we already have all of the information we need, we just don't have the underlying ability to do anything with that information yet.

I think this would require a few improvements to CQs though (which I think would be a good thing for everybody using CQs and for people wanting to use this feature). We would have to automatically update shards when a write occurred to an older shard. Any shards that were hot we would not automatically update. During the time between when the CQ runs and when the older shard was written to, the query engine would have to automatically use the raw data rather than the continuous query aggregate. We would also have to ensure that a continuous query was executed against a shard before the retention service deletes the shard.

jwilder commented 8 years ago

Personally, I would like to see this happen automatically if possible as opposed to having more syntax or series to rollup. I'm guessing most rollups are going to be basic mean, max, first, last, etc.. of all the fields. Maybe we could calculate and store these automatically and efficiently and use them at query time when possible?

One way we might be able to do this is to augment the TSM index with some of this summary data (min, max, count, sum, etc..) for each block while writing TSM files. At query time, if we know we need to calculate the min for a series and the block time range is fully covered by the time range of the block, we could return the min from the index. For any blocks that are partially in the range, we'd have to decode/calculate the min, but that should be limited to the boundary time ranges. Some of these operations can also be done on the compressed data too. This would skip the more expensive decoding and aggregate function calculations for large ranges of time potentially making it fast enough.

This idea is more along the lines of @lpalm's suggestion, but without as many levels and incorporating it into the storage directly.

jsternberg commented 8 years ago

@pauldix can you expand on this requirement?

  1. Users should be able to specify a time range and optionally a minimum number of desired samples and have the database return a precision level appropriate

I'm a bit confused about what that means. An example would be really helpful.

pauldix commented 8 years ago

@jsternberg the use case there is when the user wants to draw a graph and they know they have a certain number of pixels. So they just want to specify a time range and the number of points to return.

jsternberg commented 8 years ago

I'm closing this in favor of #7198 which we will use for identifying the underlying problem and then beginning to work on an RFC for how to implement it.