Open sbengo opened 7 years ago
Hi again,
@nathanielc, @desa, we did the same test with Kapacitor 1.3.3 (git: master eacb3730de19d7c70b6481f53100c20850d30b8b) and it seems to have the same behaviour explained on the comment above.
Thanks, Regards
@sbengo Thanks for the report. What does the data that you're writing look like? How many tags, fields, etc.? Also the length of each of the tag keys and values would be useful.
One thing that we can do to lower the amount of things buffered is only keep the fields we need in the window
var data = stream
|from()
.database('switch_metrics')
.retentionPolicy('1y')
.measurement('ifMIB_metrics')
.groupBy([*])
.where(lambda: TRUE)
|eval()
.keep('InputUtilization')
|window()
.period(30m)
.every(1m)
.align()
|mean('InputUtilization')
.as('value')
Assuming it keeps the data for 30min (window .period()) 30 min 4024 series 1 point/1min = 120720 points over 30 min of data, it seems each point is using 1,078GB / 120720 points = 9588 bytes. We think this is too much memory and we can not understand why Kapacitor requires this amount of memory for each point.
I noticed that in our window node, we emit the data to the pipeline every minute with the last 30m
|window()
.period(30m)
.every(1m)
.align()
This means that every minute we send a 30m buffer of data down the pipeline. Internally this creates two ~30m windows of the data every minute so the math here is a bit off. This would also explain the leveling off at 1h. Nonetheless, with the adjustments to the equation, things are still a bit excessive.
To summarize, I've got a couple asks
Thanks for the quick answer.
In order to track which device/interface is giving errors, it is completely necessary to preserve the tags and show them on the alert
.
We are using a generic template so we have no plans to hardcode the tag set on keep()
function.
We will do the other proposed test on the following days.
In order to provide more info, our data looks like:
- Precision = 1m
- Numer of tagKeys = 8
- Number of fieldKeys = 14
- Cardinality = 4024
As an example of one entry:
tagKey | length tagKey | ~ length tagValue |
---|---|---|
tag1 | 5 | 10 |
tag2 | 6 | 3 |
tag3 | 11 | 4 |
tag4 | 9 | 3 |
tag5 | 7 | 15 |
tag6 | 8 | 21 |
tag7 | 6 | 16 |
tag8 | 6 | 4 |
fieldKey | length tagKey | type |
---|---|---|
InputUtilization | 16 | "float" |
OutputUtilization | 17 | "float" |
ifAdminStatus | 13 | "float" |
ifHCInBroadcastPkts | 19 | "float" |
ifHCInMulticastPkts | 19 | "float" |
ifHCInOctets | 12 | "float" |
ifHCInUcastPkts | 15 | "float" |
ifHCOutBroadcastPkts | 20 | "float" |
ifHCOutMulticastPkts | 20 | "float" |
ifHCOutOctets | 13 | "float" |
ifHCOutUcastPkts | 16 | "float" |
ifInDiscards | 12 | "float" |
ifOperStatus | 12 | "float" |
inErrorRate | 11 | "float" |
We are using a generic template so we have no plans to hardcode the tag set on keep() function.
Keep only applies to fields. All tags will be preserved. I'd try to use the keep
with the window. This way you'll buffer less data.
There is any way to keep in memory as a variable map and after apply to the "message/details" variable template to pass to the alert node?
Not sure I follow what you're trying to do.
Hi @desa , first of all thanks for your reply.
We did some test applying your suggestion of keeping only the 'InputUtilization' before the |window()
node field and those are the results and questions:
// TICKSCRIPT:
// ================
var data = stream
|from()
.database('switch_metrics')
.retentionPolicy('1y')
.measurement('ifMIB_metrics')
.groupBy([*])
.where(lambda: TRUE)
|eval()
.keep('InputUtilization')
|window()
.period(period)
.every(1m)
.align()
|mean('InputUtilization')
.as('value')
...
Var | Value |
---|---|
period | 30m |
Mem before | Time to standby | Mem standby | Total increase |
---|---|---|---|
1.984GB | ~ 1 hour | 2.776 GB | 0.792 GB |
850403524,608 / (30min * 4025 points) = 7042 bytes/point
We activated another task to eval on the same measurement another field: OutputUtilization
after the Test 1 - 2017/09/29
without restarting Kapacitor, with the following results:
Var | Value |
---|---|
period | 30m |
Mem before | Time to standby | Mem standby | Total increase |
---|---|---|---|
2.800 GB | ~ 1 hour | 3.331 GB | 0.531 GB |
850403524,608 / (30min * 4025 points) = 4721 bytes/point
Test 1
Var | Value |
---|---|
period | 15m |
Mem before | Time to standby | Mem standby | Total increase |
---|---|---|---|
2.063 | ~ 30 min | 2.440 GB | 0.377 GB |
404800667,648 bytes / (15 min * 4025 points) = 6704 bytes/point
period 15m
seems to be related with the total elapsed time to reach the max memory consumption. Its the half elapsed time than Test 1
and Test 2
period = 30m
.1m
and we keep it on the same node windowing 30 min
, we don't see why it needs 2*period time to reach its max memory:"This means that every minute we send a 30m buffer of data down the pipeline. Internally this creates two ~30m windows of the data every minute so the math here is a bit off. This would also explain the leveling off at 1h. Nonetheless, with the adjustments to the equation, things are still a bit excessive."
Is not Kapacitor keeping data like the following example? If its right, the memory would increase for period+1
minutes
For example with a period = 30m
time | Datapoins on window node | Datapoints on next node |
---|---|---|
0:00 | 1 | -- |
0:01 | 2 | 1 |
0:02 | 3 | 2 |
... | ... | ... |
0:29 | 30 | 29 |
0:30 | 30 | 30 |
0:31 | 30 | 30 |
0:32 | 30 | 30 |
... | ... | ... |
Do you agree?
If there is one buffer for each node (in our case, 2 nodes: window, mean), can we assume that the memory used on each node is exactly the half of each memory?
As it can be shown, we are defaulting some fields
to check if the alert must be fired or not. What's better, keep tags or keep fields? What uses less memory?
Hello again!
I update the issue with another test:
In this Test, we set up 5 task with the same cardinality: 4025 point into the same measurement and selecting different fields from the same time series (all fields are sent in the same write action)
We modified the TickScript: instead of using 3 different vars we defined the tickscript calling the stream node and chaining them
// TICKSCRIPT:
// ================
stream
|from()
.database('switch_metrics')
.retentionPolicy('1y')
.measurement('ifMIB_metrics')
.groupBy([*])
.where(lambda: TRUE)
|eval()
.keep('InputUtilization')
|window()
.period(period)
.every(1m)
.align()
|mean('InputUtilization')
.as('value')
...
|default()
...
|alert()...
Var | Value |
---|---|
period | 15m |
Mem before | Time to standby | Mem standby | Total increase |
---|---|---|---|
1.565 | ~ 15 min | 3.278 GB | 1.713 GB |
1839319744,512 bytes / (15 min * 4025 points * 5 Task) = 6093 bytes/point
period
duration Test1 - 13%
| Test2 - 9%
.Hi @desa, @nathanielc I'm working with @sbengo in these analysis and we have decided repeat the test but now we have disabled all other running tasks and measured the Process "Heap In use" Instead of System Memory, as a more reliable way to measure memory consumption.
As in the previous tests , we need compute the mean of 15 minutes of "InputUtilization" on aprox 4029 switch port interfaces and after we will compare them with some thresholds, we have also used an UDF (monInjector) to inject some fields (mon_activo,mon_linia,mon_exc) taken from an external(not influxdb) database, to control whether to continue with threshold evaluation and to select the among 3 different possible thresholds, also have hour/weekday filtering.
//TICKSCRIPT:
//================
//var data = stream
stream
|from()
.database(INFLUX_BD)
.retentionPolicy(INFLUX_RP)
.measurement(INFLUX_MEAS)
.groupBy(influx_agrup)
.where(INFLUX_FILTER)
|default()
.field(FIELD,FIELD_DEFAULT)
|eval()
.keep(FIELD)
|window()
.period(INTERVALO_CHECK)
.every(every)
.align()
|mean(FIELD)
.as('value')
@monInjector()
.alertId(ID)
.searchByTag(DEVICEID_TAG)
|default()
.field('mon_activo', TRUE)
.field('mon_linea', 'LBLE')
.field('mon_exc', 0)
|eval(lambda:hour("time"), lambda:weekday("time"))
.as('hora','dia')
.keep()
|eval(lambda: if("mon_activo" == TRUE AND "mon_exc" >= 0 AND strContains("mon_linea",ID_LINIA), 1, 0))
.as('mon_check')
.keep()
|httpOut('mon_check')
|alert()
.crit(lambda: if("mon_check" == 1 AND "hora" >= TH_CRIT_MIN_HOUR AND "hora" <= TH_CRIT_MAX_HOUR AND strContains(DIA_SEMANA_CRIT,string("dia")), float("value") > if("mon_exc" == 0, TH_CRIT_DEF, if("mon_exc" == 1, TH_CRIT_EX1, if("mon_exc" == 2, TH_CRIT_EX2, 0.0))),FALSE))
.warn(lambda: if("mon_check" == 1 AND "hora" >= TH_WARN_MIN_HOUR AND "hora" <= TH_WARN_MAX_HOUR AND strContains(DIA_SEMANA_WARN,string("dia")), float("value") > if("mon_exc" == 0, TH_WARN_DEF, if("mon_exc" == 1, TH_WARN_EX1, if("mon_exc" == 2, TH_WARN_EX2, 0.0))),FALSE))
.info(lambda: if("mon_check" == 1 AND "hora" >= TH_INFO_MIN_HOUR AND "hora" <= TH_INFO_MAX_HOUR AND strContains(DIA_SEMANA_INFO,string("dia")), float("value") > if("mon_exc" == 0, TH_INFO_DEF, if("mon_exc" == 1, TH_INFO_EX1, if("mon_exc" == 2, TH_INFO_EX2, 0.0))),FALSE))
.stateChangesOnly()
.id(ID)
.idTag(idTag)
.levelTag(levelTag)
.messageField(messageField)
.durationField(durationField)
.message(message)
.details(details)
.post('http://output_webservice')
.email()
.to(envio_mail)
|httpOut('output')
When enabling this tasks we can see this consumption.
test | Init Heap | End Heap | Heap Usage | Cardinality | Points(15m) | bytes /point |
---|---|---|---|---|---|---|
1 | 18Mb | 554Mb | 536 Mb | 4029 | 60435 | 9299 b /point |
In this test we have removed the window node, and replaced the mean() node for the movingAverage() with 15 points (our resolution is 1point/minute). The result should be the same as the Test 1.
//TICKSCRIPT:
//================
//var data = stream
stream
|from()
.database(INFLUX_BD)
.retentionPolicy(INFLUX_RP)
.measurement(INFLUX_MEAS)
.groupBy(influx_agrup)
.where(INFLUX_FILTER)
|default()
.field(FIELD,FIELD_DEFAULT)
|eval()
.keep(FIELD)
|movingAverage(FIELD,MOV_AVG_POINTS)
.as('value')
@monInjector()
.alertId(ID)
.searchByTag(DEVICEID_TAG)
|default()
.field('mon_activo', TRUE)
.field('mon_linea', 'LBLE')
.field('mon_exc', 0)
|eval(lambda:hour("time"), lambda:weekday("time"))
.as('hora','dia')
.keep()
|eval(lambda: if("mon_activo" == TRUE AND "mon_exc" >= 0 AND strContains("mon_linea",ID_LINIA), 1, 0))
.as('mon_check')
.keep()
|httpOut('mon_check')
|alert()
.crit(lambda: if("mon_check" == 1 AND "hora" >= TH_CRIT_MIN_HOUR AND "hora" <= TH_CRIT_MAX_HOUR AND strContains(DIA_SEMANA_CRIT,string("dia")), float("value") > if("mon_exc" == 0, TH_CRIT_DEF, if("mon_exc" == 1, TH_CRIT_EX1, if("mon_exc" == 2, TH_CRIT_EX2, 0.0))),FALSE))
.warn(lambda: if("mon_check" == 1 AND "hora" >= TH_WARN_MIN_HOUR AND "hora" <= TH_WARN_MAX_HOUR AND strContains(DIA_SEMANA_WARN,string("dia")), float("value") > if("mon_exc" == 0, TH_WARN_DEF, if("mon_exc" == 1, TH_WARN_EX1, if("mon_exc" == 2, TH_WARN_EX2, 0.0))),FALSE))
.info(lambda: if("mon_check" == 1 AND "hora" >= TH_INFO_MIN_HOUR AND "hora" <= TH_INFO_MAX_HOUR AND strContains(DIA_SEMANA_INFO,string("dia")), float("value") > if("mon_exc" == 0, TH_INFO_DEF, if("mon_exc" == 1, TH_INFO_EX1, if("mon_exc" == 2, TH_INFO_EX2, 0.0))),FALSE))
.stateChangesOnly()
.id(ID)
.idTag(idTag)
.levelTag(levelTag)
.messageField(messageField)
.durationField(durationField)
.message(message)
.details(details)
.post('http://output_webservice')
.email()
.to(envio_mail)
|httpOut('output')
When enabled the task with this new TickScript this is the result
test | Init Heap | End Heap | Heap Usage | Cardinality | Points(15m) | bytes /point |
---|---|---|---|---|---|---|
2 | 15Mb | 199Mb | 184 Mb | 4029 | 60435 | 3192 b /point |
This test show us how memory increases a lot after 15 minutes when movingAverage node begins to send data to the following nodes.
In this test we have removed the, not needed httpOut() nodes
//TICKSCRIPT:
//================
//var data = stream
stream
|from()
.database(INFLUX_BD)
.retentionPolicy(INFLUX_RP)
.measurement(INFLUX_MEAS)
.groupBy(influx_agrup)
.where(INFLUX_FILTER)
|default()
.field(FIELD,FIELD_DEFAULT)
|eval()
.keep(FIELD)
|movingAverage(FIELD,MOV_AVG_POINTS)
.as('value')
@monInjector()
.alertId(ID)
.searchByTag(DEVICEID_TAG)
|default()
.field('mon_activo', TRUE)
.field('mon_linea', 'LBLE')
.field('mon_exc', 0)
|eval(lambda:hour("time"), lambda:weekday("time"))
.as('hora','dia')
.keep()
|eval(lambda: if("mon_activo" == TRUE AND "mon_exc" >= 0 AND strContains("mon_linea",ID_LINIA), 1, 0))
.as('mon_check')
.keep()
|alert()
.crit(lambda: if("mon_check" == 1 AND "hora" >= TH_CRIT_MIN_HOUR AND "hora" <= TH_CRIT_MAX_HOUR AND strContains(DIA_SEMANA_CRIT,string("dia")), float("value") > if("mon_exc" == 0, TH_CRIT_DEF, if("mon_exc" == 1, TH_CRIT_EX1, if("mon_exc" == 2, TH_CRIT_EX2, 0.0))),FALSE))
.warn(lambda: if("mon_check" == 1 AND "hora" >= TH_WARN_MIN_HOUR AND "hora" <= TH_WARN_MAX_HOUR AND strContains(DIA_SEMANA_WARN,string("dia")), float("value") > if("mon_exc" == 0, TH_WARN_DEF, if("mon_exc" == 1, TH_WARN_EX1, if("mon_exc" == 2, TH_WARN_EX2, 0.0))),FALSE))
.info(lambda: if("mon_check" == 1 AND "hora" >= TH_INFO_MIN_HOUR AND "hora" <= TH_INFO_MAX_HOUR AND strContains(DIA_SEMANA_INFO,string("dia")), float("value") > if("mon_exc" == 0, TH_INFO_DEF, if("mon_exc" == 1, TH_INFO_EX1, if("mon_exc" == 2, TH_INFO_EX2, 0.0))),FALSE))
.stateChangesOnly()
.id(ID)
.idTag(idTag)
.levelTag(levelTag)
.messageField(messageField)
.durationField(durationField)
.message(message)
.details(details)
.post('http://output_webservice')
.email()
.to(envio_mail)
When enabled these are the results
test | Init Heap | End Heap | Heap Usage | Cardinality | Points(15m) | bytes /point |
---|---|---|---|---|---|---|
3 | 17Mb | 160Mb | 160 Mb | 4029 | 60435 | 2481 b /point |
When removed httpOut() the Heap peaks have disappeared.
In this test we have removed all unneeded nodes and moved the related logic ( time checking, and some other checking) to our UDF (monInjector).
With this logic change , we have reduced from 9 nodes to 5
//TICKSCRIPT:
//================
//var data = stream
stream
|from()
.database(INFLUX_BD)
.retentionPolicy(INFLUX_RP)
.measurement(INFLUX_MEAS)
.groupBy(influx_agrup)
.where(INFLUX_FILTER)
|default()
.field(FIELD,FIELD_DEFAULT)
|movingAverage(FIELD,MOV_AVG_POINTS)
.as('value')
@monInjector()
.alertId(ID)
.searchByTag(DEVICEID_TAG)
.setLine(LIDE_ID)
.critTime(DIA_SEMANA_CRIT,TH_CRIT_MIN_HOUR,TH_CRIT_MIN_HOUR)
.warnTime(DIA_SEMANA_WARN,TH_WARN_MIN_HOUR,TH_WARN_MIN_HOUR)
.infoTime(DIA_SEMANA_INFO,TH_INFO_MIN_HOUR,TH_INFO_MIN_HOUR)
|alert()
.crit(lambda: if("check_crit" , float("value") > if("mon_exc" == 0, TH_CRIT_DEF, if("mon_exc" == 1, TH_CRIT_EX1, if("mon_exc" == 2, TH_CRIT_EX2, 0.0))),FALSE))
.warn(lambda: if("check_warn" , float("value") > if("mon_exc" == 0, TH_WARN_DEF, if("mon_exc" == 1, TH_WARN_EX1, if("mon_exc" == 2, TH_WARN_EX2, 0.0))),FALSE))
.info(lambda: if("check_info" , float("value") > if("mon_exc" == 0, TH_INFO_DEF, if("mon_exc" == 1, TH_INFO_EX1, if("mon_exc" == 2, TH_INFO_EX2, 0.0))),FALSE))
.stateChangesOnly()
.id(ID)
.idTag(idTag)
.levelTag(levelTag)
.messageField(messageField)
.durationField(durationField)
.message(message)
.details(details)
.post('http://output_webservice')
.email()
.to(envio_mail)
When enabled the node we can see this behaviour.
test | Init Heap | End Heap | Heap Usage | Cardinality | Points(15m) | bytes /point |
---|---|---|---|---|---|---|
4 | 11Mb | 53Mb | 42 Mb | 4029 | 60435 | 728 b /point |
test | Init Heap | End Heap | Heap Usage | Cardinality | Points(15m) | bytes /point | Test type |
---|---|---|---|---|---|---|---|
1 | 18Mb | 554Mb | 536 Mb | 4029 | 60435 | 9299 b /point | window+mean() 15min |
2 | 15Mb | 199Mb | 184 Mb | 4029 | 60435 | 3192 b /point | movingAverage(15) |
3 | 17Mb | 160Mb | 160 Mb | 4029 | 60435 | 2481 b /point | no httpOut() |
4 | 11Mb | 53Mb | 42 Mb | 4029 | 60435 | 728 b /point | 9 to 5 nodes in pipeline |
We have reduced from initial 9299bytes/point to 720b/point , by reducing number of nodes ( and also moving logic to our external UDF)
It seems like nodes after movingAverage (monInjector and Alert) are consuming such as movingAverage itself , but any of them stores data in memory )
Anyway we think 720 bytes/point is to much to a point where the sum of all tags/fields is about 200 bytes ( supposing each point are completely stored on memory although Tags have always the same values )
Remember we have working with this set of data.
tagKey | length tagKey | ~ length tagValue |
---|---|---|
tag1 | 5 | 10 |
tag2 | 6 | 3 |
tag3 | 11 | 4 |
tag4 | 9 | 3 |
tag5 | 7 | 15 |
tag6 | 8 | 21 |
tag7 | 6 | 16 |
tag8 | 6 | 4 |
fieldKey | length tagKey | type |
---|---|---|
InputUtilization | 16 | "float" |
fieldKey | length tagKey | type |
---|---|---|
check_crit | 10 | "boolean" |
check_warn | 10 | "boolean" |
check_info | 10 | "boolean" |
mon_exc | 9 | "integer" |
I would like to understand how memory is handled in kapacitor in order to optimise the resource consumption for all our dataset ( lots of products with aprox 400 measurements sometimes with cardinality near 50000) . We need also sometimes compute online , averaged data for several hours ( 1 , 2 , 3) .
Thank you for your great work and We hope we can help you to improve it with our tests.
As a last test trying to release memory , we have removed completely the UDF functionality by a default node witch will inject 4 default fields ( as UDF did before). With this tickScript.
//TICKSCRIPT:
//================
//var data = stream
stream
|from()
.database(INFLUX_BD)
.retentionPolicy(INFLUX_RP)
.measurement(INFLUX_MEAS)
.groupBy(influx_agrup)
.where(INFLUX_FILTER)
|default()
.field(FIELD,FIELD_DEFAULT)
|movingAverage(FIELD,MOV_AVG_POINTS)
.as('value')
|default()
.field('check_crit',TRUE)
.field('check_warn',TRUE)
.field('check_info',TRUE)
.field('mon_exc',0)
|alert()
.crit(lambda: if("check_crit" , float("value") > if("mon_exc" == 0, TH_CRIT_DEF, if("mon_exc" == 1, TH_CRIT_EX1, if("mon_exc" == 2, TH_CRIT_EX2, 0.0))),FALSE))
.warn(lambda: if("check_warn" , float("value") > if("mon_exc" == 0, TH_WARN_DEF, if("mon_exc" == 1, TH_WARN_EX1, if("mon_exc" == 2, TH_WARN_EX2, 0.0))),FALSE))
.info(lambda: if("check_info" , float("value") > if("mon_exc" == 0, TH_INFO_DEF, if("mon_exc" == 1, TH_INFO_EX1, if("mon_exc" == 2, TH_INFO_EX2, 0.0))),FALSE))
.stateChangesOnly()
.id(ID)
.idTag(idTag)
.levelTag(levelTag)
.messageField(messageField)
.durationField(durationField)
.message(message)
.details(details)
.post('http://output_webservice')
.email()
.to(envio_mail)
When enabled the node we can see this behaviour.
test | Init Heap | End Heap | Heap Usage | Cardinality | Points(15m) | bytes /point |
---|---|---|---|---|---|---|
5 | 17Mb | 57Mb | 40 Mb | 4029 | 60435 | 694 b /point |
As we can see this change does not affect to much to the memory consumption.
Still the same questions than before.
I would like to understand how memory is handled in kapacitor in order to optimise the resource consumption for all our dataset ( lots of products with aprox 400 measurements sometimes with cardinality near 50000) . We need also sometimes compute online , averaged data for several hours ( 1 , 2 , 3) .
Hello @sbengo @toni-moreno , this is a great analysis. Thanks!
I'll answer your questions below.
Does Kapacitor store repeated tags on memory although the tags should do not change?
Yes, in most cases Kapacitor stores the data as its written via the API, so if each point has repeated tags it is duplicated. There are cases where we know that the tags will not be used and we can drop them. Your use of movingAverage
si one of these cases. The moving average implementation only stores the float value for the number of points in the average. Only the common tags are stored, and they are stoed only once for the node. This is why movingAverage is more efficient than a window()|mean()
operation.
Why memory consumption for node monInjector/Alert is greater or equal than the movingAverage if they are not storing old points in memory ?
My answer above address this as well.
There is plans to implement recursive formulas on some nodes (like movingAverage) avoiding excessive data cache?
Currently movingAverage is storing as little data as possible(which equates to a slice of floats the size of moving average). Do you have another algorithm in mind?
How can reduce memory consumption on high cardinality measurements ?
In general each node will have to store the metadata about each unique group. So as you saw reducing the number of nodes can reduce the overall memory usage. For that reason the eval
node accepts multiple lambda expressions, not just one. This way you can do all your eval work in a single node thus reducing the overhead. Or like you have done push the logic down into a UDF, although we would like Kapacitor to be efficient enough that this is not necessary.
do you need some other different test with our dataset ?
Yes, it would be helpful to get a heap profile after the task is in its steady state. This can be retrieved by making this request.
curl 'http://localhost:9092/kapacitor/v1/debug/pprof/heap?debug=1' > heap.pprof
In order for us to be able to read the profile data we need to exact commit that you are running. If you are still on 1.3.3 great, if not we will need to know.
You can share the profile here if you want or feel free to send me the file directly. The file is a simple text file so you can inspect it for any sensitive data first. My contact info can be found on my github profile.
Thanks again for the detailed write up.
Hi @nathanielc
I've uploaded the pprof file for version Kapacitor 1.3.3 (git: master eacb3730de19d7c70b6481f53100c20850d30b8b)
kapacitor-heap.pprof-20171010.gz
I've coded myself a new aggregator for telegraf (https://github.com/influxdata/telegraf/pull/2167) we can use recursive formulas for some basic statistics like (count, sum,max,min,mean,variance,standard deviation), this node will need apply a "reset" on its internal counter once a group of points have been reached", it will lose output resolution , because of you will have one output point for each group of input points. But for much of our alert rules could be a good solution allowing saving memory resources.
What do you thing about creating a special recursiveStatsNode
to avoid window+mean and save memory consumption?
perhaps something like:
recursiveStatsNode()
.every(TIME)
.compute("max","min","mean",...)
What do you think about?
@toni-moreno Thanks for the pprof file. There seems to be a lot of the in use RAM relating to prometheus scrapping specifically AWS. Does that sound right? Have you configured Kapacitor to scrape AWS hosts?
The function in InfluxQL are nearly all implemented using this idea of consuming a single point at a time, and reducing the result as it goes along. The problem is that the window node is currently a bit naive. The window node could be optimized such that if its configured to be fixed non overlapping windows that it would immediately pass the data along only keeping track of metadata about the time of the points only. This would then take advantage of the reduce type function already implemented.
In your example case though you have overlapping windows so the points that are part of the overlapping windows must be cached so they can be sent in each new window.
Hi @nathanielc we have not any Prometheus scrapping configured. In fact we have updated version with old 1.1 config file ( there is not any [[scraper]] section in this file). After updated with all new sections like [[scraper]] , [[azure]], and others , and restarted I've repeated the last test.
test | Init Heap | End Heap | Heap Usage | Cardinality | Points(15m) | bytes /point |
---|---|---|---|---|---|---|
6 | 8Mb | 48Mb | 40 Mb | 4029 | 60435 | 694 b /point |
no changes on the memory consumption/point with this test, only initial memory has decreased some 10Mb.
About recursive formulas I've not understand your explanation ( sorry perhaps my poor English has contributed on this).
On the recursive implementation we assume we won't have overlapped window. In this approach if we would like to compute the mean of 15 points we will need only a counter (15 to 0 ) once it reaches 0 it should emit to the next node the computed average and reset values), and the last computed average.
In the last example for 60 input points it will emit 4 output points , and assuming 8 bytes/point we have saved 87% of memory (from 120 bytes to 16 bytes)
node | Input points | Output Points | consumed memory |
---|---|---|---|
movingAverage(15) | 60 | 45 | 15*8=120 bytes |
recursiveStats(15,mean) | 60 | 4 (each 15 points) | 8(last computed)+8(counter) = 16 bytes |
We have lost resolution but saved a lot of memory , on most cases could be a great solution and will save memory when computed for >2 points . What do you think about this new approach?
Hi @nathanielc suppose we would like to help you coding a new "like" influxQLNode recursiveStats as described before for basic stats which we can compute in a recursive way.
Would you like to add as a new Node in Kapacitor in next releases ? Hi @nathanielc suppose we would like to help you coding a new "like" influxQLNode as described in recursiveStats for some basic stats which we can compute in a recursive way.
Would you like to add as a new Node in Kapacitor in next releases ?
Hi,
I'm working with Kapacitor 1.3.1 (git: master 3b5512f) to create some alerts with an InfluxDB subscription and after working on databases and several TickScripts we noticed an unusual high memory usage.
We have only enabled one task predefined with a template and filled with JSON which is retrieving data from
stream
mode to:every 1m
of30m windowed
ofInputUtilization
in everyinterface
of everyswitch
.InputUtilization
, so, sometimes error on mean nodes could appear.Example case
After enabling the task and see that it works OK, we have seen that the used system memory increased to unexpected level since we started Kapacitor and it reaches a constant level after 1 hour of working.
Assuming it keeps the data for 30min (window .period())
30 min * 4024 series * 1 point/1min = 120720 points over 30 min of data
, it seems each point is using1,078GB / 120720 points = 9588 bytes
. We think this is too much memory and we can not understand why Kapacitor requires this amount of memory for each point.We don't understand why the used system memory reaches the max level after 1 hour if only windowing points with interval set up with 30m duration.
Questions
With the behaviour explained above:
Does kapacitor need 10Kb of data to store each point of data on stream mode ?
Assuming linear scaling and the same cardinality, for only
N
task, we would needN * 1GB
of memory?Could you help us to understand Kapacitor memory behaviour, please ?
We attach you the following TickScripts and show statistics to let you check if there is something wrong on our TickScript/Nodes definition:
TickScript + node info
The following table is the brief of nodes cardinality and the diagram: Note: Sometimes there is an error on the
mean node
because the field on the point does not exist.