Open joshdurbin opened 3 years ago
If you don't care about data outside the window then your alert should stop keeping data so old, right now it keeps quite a bit of data in its history to prevent flapping etc.
You don't need a barrier, unless new data for a tagset stops coming in.
Are you using docker?
The carnality does fluctuate and stay within a reasonable size when a tick script is example, as the various aforementioned outputs show. So why the growth till OOM of Kapacitor?
Since count is always 1, you should use a |count()
instead of your |sum("count")
Your cardinality isn't insane, so you shouldn't be using anywhere near that much memory. We did slightly reduce memory pressure in 1.5.8, an upgrade may mitigate your problem signicantly. Also, if that doesn't work try adding the environmental variable GODEBUG=madvdontneed=1
Good luck, if this doesn't work, come back to me and we can try deeper debugging
The change in 1.5.8 significantly reduced memory pressure from github.com/influxdata/kapacitor/edge.(*pointMessage).GroupInfo
Good luck, if this doesn't work, feel free to poke me again.
Will do. Will try and make some changes in the next few days and report back. Cheers!
Since posting this we dialed back our monitoring to three tick scripts with a single measurement each:
with scripts that look like:
dbrp "toptraffic"."autogen"
var windowedStream = stream
|from()
.groupBy('ip')
.measurement('ips')
|barrier()
.period(2m)
.delete(TRUE)
|window()
.period(2m)
.every(5s)
.align()
var storelbStream = windowedStream
|where(lambda: "role" == 'storelb')
|sum('count')
.as('totalCount')
var sharedlbStream = windowedStream
|where(lambda: "role" == 'sharedlb')
|sum('count')
.as('totalCount')
var joinStream = storelbStream
|join(sharedlbStream)
.as('storelb','sharedlb')
.tolerance(1s)
joinStream
|alert()
.flapping(0.25, 0.5)
.history(21)
.warn(lambda: "storelb.totalCount" > 75000 OR "sharedlb.totalCount" > 75000 OR "storelb.totalCount" + "sharedlb.totalCount" > 75000)
.crit(lambda: "storelb.totalCount" > 125000 OR "sharedlb.totalCount" > 125000 OR "storelb.totalCount" + "sharedlb.totalCount" > 125000)
.message('''Observed a high rate of request volume in Production for IP {{ index .Tags "ip" }}. Requests within a 2m window at the Store LB are: {{ index .Fields "storelb.totalCount" }} and Shared LB: {{ index .Fields "sharedlb.totalCount" }}.''')
.stateChangesOnly(2m)
.slack()
.channel('#ops-noise')
.noRecoveries()
joinStream
|alert()
.flapping(0.25, 0.5)
.history(21)
.warn(lambda: "storelb.totalCount" > 75000 OR "sharedlb.totalCount" > 75000 OR "storelb.totalCount" + "sharedlb.totalCount" > 75000)
.crit(lambda: "storelb.totalCount" > 125000 OR "sharedlb.totalCount" > 125000 OR "storelb.totalCount" + "sharedlb.totalCount" > 125000)
.message('''Observed a high rate of request volume in Production for IP {{ index .Tags "ip" }}. Requests within a 2m window at the Store LB are: {{ index .Fields "storelb.totalCount" }} and Shared LB: {{ index .Fields "sharedlb.totalCount" }}.''')
.stateChangesOnly(2m)
.exec('/usr/bin/kapacitor_pubsub_stdin_invoker.sh')
We thought we were holding steady at around 5GB memory usage, but over 8-9 days we crept up to 9.5 or so.
We deployed the changes you recommended (excluding the count change) altering the script to look like:
dbrp "toptraffic"."autogen"
var windowedStream = stream
|from()
.groupBy('ip')
.measurement('ips')
|window()
.period(2m)
.every(5s)
.align()
var storelbStream = windowedStream
|where(lambda: "role" == 'storelb')
|sum('count')
.as('totalCount')
var sharedlbStream = windowedStream
|where(lambda: "role" == 'sharedlb')
|sum('count')
.as('totalCount')
var joinStream = storelbStream
|join(sharedlbStream)
.as('storelb','sharedlb')
.tolerance(1s)
joinStream
|alert()
.warn(lambda: "storelb.totalCount" > 75000 OR "sharedlb.totalCount" > 75000 OR "storelb.totalCount" + "sharedlb.totalCount" > 75000)
.crit(lambda: "storelb.totalCount" > 125000 OR "sharedlb.totalCount" > 125000 OR "storelb.totalCount" + "sharedlb.totalCount" > 125000)
.message('''Observed a high rate of request volume in Production for IP {{ index .Tags "ip" }}. Requests within a 2m window at the Store LB are: {{ index .Fields "storelb.totalCount" }} and Shared LB: {{ index .Fields "sharedlb.totalCount" }}.''')
.slack()
.channel('#ops-noise')
.noRecoveries()
joinStream
|alert()
.warn(lambda: "storelb.totalCount" > 75000 OR "sharedlb.totalCount" > 75000 OR "storelb.totalCount" + "sharedlb.totalCount" > 75000)
.crit(lambda: "storelb.totalCount" > 125000 OR "sharedlb.totalCount" > 125000 OR "storelb.totalCount" + "sharedlb.totalCount" > 125000)
.message('''Observed a high rate of request volume in Production for IP {{ index .Tags "ip" }}. Requests within a 2m window at the Store LB are: {{ index .Fields "storelb.totalCount" }} and Shared LB: {{ index .Fields "sharedlb.totalCount" }}.''')
.exec('/usr/bin/kapacitor_pubsub_stdin_invoker.sh')
...which produced rapid memory utilization up to about 35.5GB over 5 1/2 hours:
kapacitor show top_ips
ID: top_ips
Error:
Template:
Type: stream
Status: enabled
Executing: true
Created: 22 Jan 21 22:25 UTC
Modified: 11 Feb 21 20:52 UTC
LastEnabled: 11 Feb 21 20:52 UTC
Databases Retention Policies: ["toptraffic"."autogen"]
TICKscript:
dbrp "toptraffic"."autogen"
var windowedStream = stream
|from()
.groupBy('ip')
.measurement('ips')
|window()
.period(2m)
.every(5s)
.align()
var storelbStream = windowedStream
|where(lambda: "role" == 'storelb')
|sum('count')
.as('totalCount')
var sharedlbStream = windowedStream
|where(lambda: "role" == 'sharedlb')
|sum('count')
.as('totalCount')
var joinStream = storelbStream
|join(sharedlbStream)
.as('storelb', 'sharedlb')
.tolerance(1s)
joinStream
|alert()
.warn(lambda: "storelb.totalCount" > 75000 OR "sharedlb.totalCount" > 75000 OR "storelb.totalCount" + "sharedlb.totalCount" > 75000)
.crit(lambda: "storelb.totalCount" > 125000 OR "sharedlb.totalCount" > 125000 OR "storelb.totalCount" + "sharedlb.totalCount" > 125000)
.message('''Observed a high rate of request volume in Production for IP {{ index .Tags "ip" }}. Requests within a 2m window at the Store LB are: {{ index .Fields "storelb.totalCount" }} and Shared LB: {{ index .Fields "sharedlb.totalCount" }}.''')
.slack()
.channel('#ops-noise')
.noRecoveries()
joinStream
|alert()
.warn(lambda: "storelb.totalCount" > 75000 OR "sharedlb.totalCount" > 75000 OR "storelb.totalCount" + "sharedlb.totalCount" > 75000)
.crit(lambda: "storelb.totalCount" > 125000 OR "sharedlb.totalCount" > 125000 OR "storelb.totalCount" + "sharedlb.totalCount" > 125000)
.message('''Observed a high rate of request volume in Production for IP {{ index .Tags "ip" }}. Requests within a 2m window at the Store LB are: {{ index .Fields "storelb.totalCount" }} and Shared LB: {{ index .Fields "sharedlb.totalCount" }}.''')
.exec('/usr/bin/kapacitor_pubsub_stdin_invoker.sh')
DOT:
digraph top_ips {
graph [throughput="5019.95 points/s"];
stream0 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
stream0 -> from1 [processed="94807195"];
from1 [avg_exec_time_ns="47.005µs" errors="0" working_cardinality="0" ];
from1 -> window2 [processed="94807195"];
window2 [avg_exec_time_ns="606.918µs" errors="0" working_cardinality="60439" ];
window2 -> where5 [processed="5125353"];
window2 -> where3 [processed="5125353"];
where5 [avg_exec_time_ns="161.103µs" errors="0" working_cardinality="50115" ];
where5 -> sum6 [processed="5124931"];
sum6 [avg_exec_time_ns="242.923µs" errors="0" working_cardinality="50115" ];
sum6 -> join8 [processed="5124931"];
where3 [avg_exec_time_ns="47.721µs" errors="0" working_cardinality="50115" ];
where3 -> sum4 [processed="5125162"];
sum4 [avg_exec_time_ns="91.395µs" errors="0" working_cardinality="50115" ];
sum4 -> join8 [processed="5125159"];
join8 [avg_exec_time_ns="24.861µs" errors="0" working_cardinality="50115" ];
join8 -> alert10 [processed="5124931"];
join8 -> alert9 [processed="5124931"];
alert10 [alerts_inhibited="0" alerts_triggered="0" avg_exec_time_ns="52.841µs" crits_triggered="0" errors="0" infos_triggered="0" oks_triggered="0" warns_triggered="0" working_cardinality="50115" ];
alert9 [alerts_inhibited="0" alerts_triggered="0" avg_exec_time_ns="26.064µs" crits_triggered="0" errors="0" infos_triggered="0" oks_triggered="0" warns_triggered="0" working_cardinality="50115" ];
}
I'll try again with the memory tuning parameter.
Now that we're a few hours out, here'a a memory graph with a wider timeline than the other two in the prior comment.
The graph starts with the tick scripts in place without the changes mentioned. The peaks after the yellow arrow indicate me making various changes to the alert node, dropping history/flapping/state change tracking and the barrier nodes. The red arrow is where we revert to the tick scripts that are in place prior to the yellow arrow.
All of this seems to further indicate that if you're processing a stream of data within a window that point node TTL, if you will, does not seem to honor / evaluate / whatever the max window size. Again, in this example, I don't care about any point data over 2m and it can be expunged. Even though there is growth, it does seem that the barrier node helps inhibit (rapid) memory consumption.
@docmerlin any thoughts on those updates ^^^ ?
Yes, we found several memory leaks in JoinNode and UnionNode and are fixing them now, That being said it is still possible for memory to grow depending on your data cardinality.
@joshdurbin here is a PR to fix some of the leakage. https://github.com/influxdata/kapacitor/pull/2489
As those merged PRs expected in the next release?
And, yeah, we see that the cardinality controls aren't passing through he join node and the dependent nodes on our end too, in these use cases:
ID: top_ips
Error:
Template:
Type: stream
Status: enabled
Executing: true
Created: 22 Jan 21 22:25 UTC
Modified: 12 Feb 21 01:40 UTC
LastEnabled: 12 Feb 21 01:40 UTC
Databases Retention Policies: ["toptraffic"."autogen"]
TICKscript:
dbrp "toptraffic"."autogen"
var windowedStream = stream
|from()
.groupBy('ip')
.measurement('ips')
|barrier()
.period(2m)
.delete(TRUE)
|window()
.period(2m)
.every(5s)
.align()
var storelbStream = windowedStream
|where(lambda: "role" == 'storelb')
|sum('count')
.as('totalCount')
var sharedlbStream = windowedStream
|where(lambda: "role" == 'sharedlb')
|sum('count')
.as('totalCount')
var joinStream = storelbStream
|join(sharedlbStream)
.as('storelb', 'sharedlb')
.tolerance(1s)
joinStream
|alert()
.flapping(0.25, 0.5)
.history(21)
.warn(lambda: "storelb.totalCount" > 75000 OR "sharedlb.totalCount" > 75000 OR "storelb.totalCount" + "sharedlb.totalCount" > 75000)
.crit(lambda: "storelb.totalCount" > 125000 OR "sharedlb.totalCount" > 125000 OR "storelb.totalCount" + "sharedlb.totalCount" > 125000)
.message('''Observed a high rate of request volume in Production for IP {{ index .Tags "ip" }}. Requests within a 2m window at the Store LB are: {{ index .Fields "storelb.totalCount" }} and Shared LB: {{ index .Fields "sharedlb.totalCount" }}.''')
.stateChangesOnly(2m)
.slack()
.channel('#ops-noise')
.noRecoveries()
joinStream
|alert()
.flapping(0.25, 0.5)
.history(21)
.warn(lambda: "storelb.totalCount" > 75000 OR "sharedlb.totalCount" > 75000 OR "storelb.totalCount" + "sharedlb.totalCount" > 75000)
.crit(lambda: "storelb.totalCount" > 125000 OR "sharedlb.totalCount" > 125000 OR "storelb.totalCount" + "sharedlb.totalCount" > 125000)
.message('''Observed a high rate of request volume in Production for IP {{ index .Tags "ip" }}. Requests within a 2m window at the Store LB are: {{ index .Fields "storelb.totalCount" }} and Shared LB: {{ index .Fields "sharedlb.totalCount" }}.''')
.stateChangesOnly(2m)
.exec('/usr/bin/kapacitor_pubsub_stdin_invoker.sh')
DOT:
digraph top_ips {
graph [throughput="4891.68 points/s"];
stream0 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
stream0 -> from1 [processed="4940426264"];
from1 [avg_exec_time_ns="111.079µs" errors="0" working_cardinality="0" ];
from1 -> barrier2 [processed="4940426264"];
barrier2 [avg_exec_time_ns="38.583µs" errors="0" working_cardinality="7172" ];
barrier2 -> window3 [processed="4940418579"];
window3 [avg_exec_time_ns="685.66µs" errors="0" working_cardinality="7172" ];
window3 -> where6 [processed="247104855"];
window3 -> where4 [processed="247104855"];
where6 [avg_exec_time_ns="177.822µs" errors="0" working_cardinality="2728" ];
where6 -> sum7 [processed="247104855"];
sum7 [avg_exec_time_ns="209.806µs" errors="0" working_cardinality="2728" ];
sum7 -> join9 [processed="247104855"];
where4 [avg_exec_time_ns="862.96µs" errors="0" working_cardinality="2728" ];
where4 -> sum5 [processed="247104855"];
sum5 [avg_exec_time_ns="218.497µs" errors="0" working_cardinality="2728" ];
sum5 -> join9 [processed="247104855"];
join9 [avg_exec_time_ns="919.745µs" errors="0" working_cardinality="749692" ];
join9 -> alert11 [processed="247104855"];
join9 -> alert10 [processed="247104855"];
alert11 [alerts_inhibited="0" alerts_triggered="73" avg_exec_time_ns="50.693µs" crits_triggered="1" errors="0" infos_triggered="0" oks_triggered="36" warns_triggered="36" working_cardinality="749692" ];
alert10 [alerts_inhibited="0" alerts_triggered="37" avg_exec_time_ns="261.104µs" crits_triggered="1" errors="0" infos_triggered="0" oks_triggered="0" warns_triggered="36" working_cardinality="749692" ];
}
I don't believe this comment on an issue from 2016 is correct. From what I can tell there is no situation, once a stream has been opened, where memory doesn't build to an OOM scenario.
dbrp "toptraffic"."autogen"
var traffic = stream
|from()
.groupBy('ip', 'id')
.measurement('ips_and_stores')
var sharedLBTraffic = traffic
|where(lambda: "src" == 1)
|sum('src')
.as('total')
var storeLBTraffic = traffic
|where(lambda: "src" == 2)
|sum('src')
.as('total')
##############
First is:
dbrp "toptraffic"."autogen"
var traffic = stream
|from()
.groupBy('ip', 'id')
.measurement('ips_and_stores')
Second is:
dbrp "toptraffic"."autogen"
var traffic = stream
|from()
.groupBy('ip', 'id')
.measurement('ips_and_stores')
var sharedLBTraffic = traffic
|where(lambda: "src" == 1)
|sum('src')
.as('total')
Third is:
dbrp "toptraffic"."autogen"
var traffic = stream
|from()
.groupBy('ip', 'id')
.measurement('ips_and_stores')
traffic
|where(lambda: "src" == 1)
|sum('src')
.as('total')
Next up, test 4, is (ruling out discrepancy between sum
and count
:
dbrp "toptraffic"."autogen"
var traffic = stream
|from()
.groupBy('ip', 'id')
.measurement('ips_and_stores')
traffic
|where(lambda: "src" == 1)
|count('src')
.as('total')
Fifth test is reverting the count/sum thing and adding a 30 second barrier configured to delete:
dbrp "toptraffic"."autogen"
var traffic = stream
|from()
.groupBy('ip', 'id')
.measurement('ips_and_stores')
|barrier()
.period(30s)
.delete(TRUE)
traffic
|where(lambda: "src" == 1)
|sum('src')
.as('total')
This does show a smoothing of memory usage.
Next up, sixth test/change is adding the counts to the other monitored system:
Tick script omitted here as its show in the exec outputs below.
We can confirm that the barrier node is reducing cardinality with two subsequent runs via kapacitor show ips_and_stores
:
ID: ips_and_stores
Error:
Template:
Type: stream
Status: enabled
Executing: true
Created: 11 Mar 21 18:05 UTC
Modified: 11 Mar 21 20:06 UTC
LastEnabled: 11 Mar 21 20:06 UTC
Databases Retention Policies: ["toptraffic"."autogen"]
TICKscript:
dbrp "toptraffic"."autogen"
var traffic = stream
|from()
.groupBy('ip', 'id')
.measurement('ips_and_stores')
|barrier()
.period(30s)
.delete(TRUE)
var storeLBTraffic = traffic
|where(lambda: "src" == 1)
|sum('src')
.as('total')
var sharedLBTraffic = traffic
|where(lambda: "src" == 2)
|sum('src')
.as('total')
DOT:
digraph ips_and_stores {
graph [throughput="14576.14 points/s"];
stream0 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
stream0 -> from1 [processed="522683"];
from1 [avg_exec_time_ns="48.437µs" errors="0" working_cardinality="0" ];
from1 -> barrier2 [processed="522683"];
barrier2 [avg_exec_time_ns="2.28µs" errors="0" working_cardinality="84561" ];
barrier2 -> where5 [processed="522683"];
barrier2 -> where3 [processed="522683"];
where5 [avg_exec_time_ns="3.735µs" errors="0" working_cardinality="84561" ];
where5 -> sum6 [processed="167322"];
sum6 [avg_exec_time_ns="14.734µs" errors="0" working_cardinality="7337" ];
where3 [avg_exec_time_ns="3.354µs" errors="0" working_cardinality="84561" ];
where3 -> sum4 [processed="355361"];
sum4 [avg_exec_time_ns="12.121µs" errors="0" working_cardinality="77277" ];
}
ID: ips_and_stores
Error:
Template:
Type: stream
Status: enabled
Executing: true
Created: 11 Mar 21 18:05 UTC
Modified: 11 Mar 21 20:06 UTC
LastEnabled: 11 Mar 21 20:06 UTC
Databases Retention Policies: ["toptraffic"."autogen"]
TICKscript:
dbrp "toptraffic"."autogen"
var traffic = stream
|from()
.groupBy('ip', 'id')
.measurement('ips_and_stores')
|barrier()
.period(30s)
.delete(TRUE)
var storeLBTraffic = traffic
|where(lambda: "src" == 1)
|sum('src')
.as('total')
var sharedLBTraffic = traffic
|where(lambda: "src" == 2)
|sum('src')
.as('total')
DOT:
digraph ips_and_stores {
graph [throughput="16238.66 points/s"];
stream0 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
stream0 -> from1 [processed="801608"];
from1 [avg_exec_time_ns="48.437µs" errors="0" working_cardinality="0" ];
from1 -> barrier2 [processed="801608"];
barrier2 [avg_exec_time_ns="2.28µs" errors="0" working_cardinality="73055" ];
barrier2 -> where5 [processed="801608"];
barrier2 -> where3 [processed="801608"];
where5 [avg_exec_time_ns="3.735µs" errors="0" working_cardinality="73054" ];
where5 -> sum6 [processed="254519"];
sum6 [avg_exec_time_ns="14.734µs" errors="0" working_cardinality="6920" ];
where3 [avg_exec_time_ns="3.354µs" errors="0" working_cardinality="73055" ];
where3 -> sum4 [processed="547089"];
sum4 [avg_exec_time_ns="12.121µs" errors="0" working_cardinality="66199" ];
}
Seventh test/change is joining those two count streams where we see that the cardinality of the join node is never reduced.
ID: ips_and_stores
Error:
Template:
Type: stream
Status: enabled
Executing: true
Created: 11 Mar 21 18:05 UTC
Modified: 11 Mar 21 20:12 UTC
LastEnabled: 11 Mar 21 20:12 UTC
Databases Retention Policies: ["toptraffic"."autogen"]
TICKscript:
dbrp "toptraffic"."autogen"
var traffic = stream
|from()
.groupBy('ip', 'id')
.measurement('ips_and_stores')
|barrier()
.period(30s)
.delete(TRUE)
var storeLBTraffic = traffic
|where(lambda: "src" == 1)
|sum('src')
.as('total')
var sharedLBTraffic = traffic
|where(lambda: "src" == 2)
|sum('src')
.as('total')
var joinStream = storeLBTraffic
|join(sharedLBTraffic)
.as('storelb', 'sharedlb')
.tolerance(1s)
DOT:
digraph ips_and_stores {
graph [throughput="15907.77 points/s"];
stream0 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
stream0 -> from1 [processed="6765340"];
from1 [avg_exec_time_ns="9.93µs" errors="0" working_cardinality="0" ];
from1 -> barrier2 [processed="6765342"];
barrier2 [avg_exec_time_ns="19.601µs" errors="0" working_cardinality="79006" ];
barrier2 -> where5 [processed="6765291"];
barrier2 -> where3 [processed="6765291"];
where5 [avg_exec_time_ns="5.58µs" errors="0" working_cardinality="79006" ];
where5 -> sum6 [processed="2127062"];
sum6 [avg_exec_time_ns="27.531µs" errors="0" working_cardinality="6877" ];
sum6 -> join8 [processed="2031335"];
where3 [avg_exec_time_ns="7.545µs" errors="0" working_cardinality="79006" ];
where3 -> sum4 [processed="4638228"];
sum4 [avg_exec_time_ns="32.777µs" errors="0" working_cardinality="72199" ];
sum4 -> join8 [processed="3627725"];
join8 [avg_exec_time_ns="19.833µs" errors="0" working_cardinality="627063" ];
}
So this all looks due to the sum and count operations on nodes.
The next iteration of something I'm trying to avoid the join is:
dbrp "toptraffic"."autogen"
var traffic = stream
|from()
.groupBy('ip', 'id')
.measurement('ips_and_stores')
|barrier()
.period(120s)
.delete(TRUE)
|stateCount(lambda: "src" == 1)
.as('total_store_lb')
|stateCount(lambda: "src" == 2)
.as('total_shared_lb')
|eval(lambda: if("total_store_lb" == -1, 0, "total_store_lb"), lambda: if("total_shared_lb" == -1, 0, "total_shared_lb"))
.as('unsigned_total_store_lb','unsigned_total_shared_lb')
.keep('total_store_lb', 'total_shared_lb', 'unsigned_total_store_lb', 'unsigned_total_shared_lb')
|alert()
.warn(lambda: "unsigned_total_store_lb" > 2500 OR "unsigned_total_shared_lb" > 2500 OR "unsigned_total_store_lb" + "unsigned_total_shared_lb" > 2500)
.message('Observed a high rate of request volume from IP {{ index .Tags "ip" }} for Store ID {{ index .Tags "id" }} at the Shared LB: {{ index .Fields "unsigned_total_shared_lb" }} and the Store LB: {{ index .Fields "unsigned_total_store_lb" }}')
.stateChangesOnly(2m)
.log('/var/log/kapacitor/test.log')
It would be nice if we could change the "not found" value for the stateCount
node. Of course, I could guard the stateCount
node with a where
cause, but then I have to split (considering I have two conditionals here) -- I need to count all those where the field src
has the value 1
and 2
, independently.
This script, does, produce large memory usage for Kapacitor, but has stabilized -- probably due to cardinality, which, is "high". The dot graph for about 30 minutes of data flow randomly in a day is:
DOT:
digraph ips_and_stores {
graph [throughput="15066.77 points/s"];
stream0 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
stream0 -> from1 [processed="34578588"];
from1 [avg_exec_time_ns="16.657µs" errors="0" working_cardinality="0" ];
from1 -> barrier2 [processed="34578588"];
barrier2 [avg_exec_time_ns="13.455µs" errors="0" working_cardinality="195103" ];
barrier2 -> state_count3 [processed="34578456"];
state_count3 [avg_exec_time_ns="36.458µs" errors="0" working_cardinality="195103" ];
state_count3 -> state_count4 [processed="34578456"];
state_count4 [avg_exec_time_ns="31.051µs" errors="0" working_cardinality="195103" ];
state_count4 -> eval5 [processed="34578456"];
eval5 [avg_exec_time_ns="56.633µs" errors="0" working_cardinality="195103" ];
eval5 -> alert6 [processed="34578456"];
alert6 [alerts_inhibited="0" alerts_triggered="553" avg_exec_time_ns="86.608µs" crits_triggered="0" errors="0" infos_triggered="0" oks_triggered="272" warns_triggered="281" working_cardinality="195103" ];
On second look though at the data coming out of the stream here, I don't think this is doing what I want.
As suggested in this thread earlier, the alert() node takes considerable memory with its configured parameters to suppress x minutes of state changes, history, flapping, etc... Particularly since we know that barrier node signals don't make it through the join node at the moment.
Example tick scripts:
dbrp "toptraffic"."autogen"
var trafficByIPAndStoreIDStream = stream
|from()
.groupBy('ip', 'id')
.measurement('ips_and_stores')
|barrier()
.period(121s)
// the period for the barrier should be +1 unit over the period defined in the downstream window
.delete(TRUE)
|window()
.period(120s)
.every(5s)
|default()
// count is not set the filebeat-udp messenger, so we add it to each point coming through with a value of 1
.field('count', 1)
var storeLBTrafficStream = trafficByIPAndStoreIDStream
|where(lambda: "src" == 1)
// the filebeat-udp message source sets the `src` field key to the value 1 for the store LB
|sum('count')
// we sum on count here and not upstream because this operation drops all other fields from the forward point
.as('totalCount')
var sharedLBTrafficStream = trafficByIPAndStoreIDStream
|where(lambda: "src" == 2)
// the filebeat-udp message source sets the `src` field key to the value 2 for the shared LB
|sum('count')
// see the other sum message ^^^
.as('totalCount')
var joinedTrafficStream = storeLBTrafficStream
|join(sharedLBTrafficStream)
.as('storelb', 'sharedlb')
joinedTrafficStream
|alert()
.warn(lambda: int("storelb.totalCount") > 50000 OR int("sharedlb.totalCount") > 50000 OR int("storelb.totalCount") + int("sharedlb.totalCount") > 50000)
.crit(lambda: int("storelb.totalCount") > 75000 OR int("sharedlb.totalCount") > 75000 OR int("storelb.totalCount") + int("sharedlb.totalCount") > 75000)
.message('Production IP {{ index .Tags "ip" }} has exceeded request thresholds for the Store ID {{ index .Tags "id" }} via the Shared LB: {{ index .Fields "sharedlb.totalCount" }} and the Store LB: {{ index .Fields "storelb.totalCount" }} within the last 2 minutes.')
.stateChangesOnly(5m)
.noRecoveries()
.slack()
.channel('#ops-noise')
joinedTrafficStream
|alert()
.warn(lambda: int("storelb.totalCount") > 50000 OR int("sharedlb.totalCount") > 50000 OR int("storelb.totalCount") + int("sharedlb.totalCount") > 50000)
.crit(lambda: int("storelb.totalCount") > 75000 OR int("sharedlb.totalCount") > 75000 OR int("storelb.totalCount") + int("sharedlb.totalCount") > 75000)
.message('Production IP {{ index .Tags "ip" }} has exceeded request thresholds for the Store ID {{ index .Tags "id" }} via the Shared LB: {{ index .Fields "sharedlb.totalCount" }} and the Store LB: {{ index .Fields "storelb.totalCount" }} within the last 2 minutes.')
.stateChangesOnly(5m)
.exec('/usr/bin/kapacitor_pubsub_stdin_invoker.sh')
Here the join stream remains and the alert nodes are removed.
dbrp "toptraffic"."autogen"
var trafficByIPAndStoreIDStream = stream
|from()
.groupBy('ip', 'id')
.measurement('ips_and_stores')
|barrier()
.period(121s)
// the period for the barrier should be +1 unit over the period defined in the downstream window
.delete(TRUE)
|window()
.period(120s)
.every(5s)
|default()
// count is not set the filebeat-udp messenger, so we add it to each point coming through with a value of 1
.field('count', 1)
var storeLBTrafficStream = trafficByIPAndStoreIDStream
|where(lambda: "src" == 1)
// the filebeat-udp message source sets the `src` field key to the value 1 for the store LB
|sum('count')
// we sum on count here and not upstream because this operation drops all other fields from the forward point
.as('totalCount')
var sharedLBTrafficStream = trafficByIPAndStoreIDStream
|where(lambda: "src" == 2)
// the filebeat-udp message source sets the `src` field key to the value 2 for the shared LB
|sum('count')
// see the other sum message ^^^
.as('totalCount')
var joinedTrafficStream = storeLBTrafficStream
|join(sharedLBTrafficStream)
.as('storelb', 'sharedlb')
Here the join stream is completely removed.
dbrp "toptraffic"."autogen"
var trafficByIPAndStoreIDStream = stream
|from()
.groupBy('ip', 'id')
.measurement('ips_and_stores')
|barrier()
.period(121s)
// the period for the barrier should be +1 unit over the period defined in the downstream window
.delete(TRUE)
|window()
.period(120s)
.every(5s)
|default()
// count is not set the filebeat-udp messenger, so we add it to each point coming through with a value of 1
.field('count', 1)
var storeLBTrafficStream = trafficByIPAndStoreIDStream
|where(lambda: "src" == 1)
// the filebeat-udp message source sets the `src` field key to the value 1 for the store LB
|sum('count')
// we sum on count here and not upstream because this operation drops all other fields from the forward point
.as('totalCount')
var sharedLBTrafficStream = trafficByIPAndStoreIDStream
|where(lambda: "src" == 2)
// the filebeat-udp message source sets the `src` field key to the value 2 for the shared LB
|sum('count')
// see the other sum message ^^^
.as('totalCount')
What I settled on, for now, is this:
dbrp "toptraffic"."autogen"
var trafficByIPAndStoreIDStream = stream
|from()
.groupBy('ip', 'id')
.measurement('ips_and_stores')
|barrier()
.period(121s)
// the period for the barrier should be +1 unit over the period defined in the downstream window
.delete(TRUE)
|window()
.period(120s)
.every(5s)
|default()
// count is not set the filebeat-udp messenger, so we add it to each point coming through with a value of 1
.field('count', 1)
var storeLBTrafficStream = trafficByIPAndStoreIDStream
|where(lambda: "src" == 1)
// the filebeat-udp message source sets the `src` field key to the value 1 for the store LB
|sum('count')
// we sum on count here and not upstream because this operation drops all other fields from the forward point
.as('totalCount')
var sharedLBTrafficStream = trafficByIPAndStoreIDStream
|where(lambda: "src" == 2)
// the filebeat-udp message source sets the `src` field key to the value 2 for the shared LB
|sum('count')
// see the other sum message ^^^
.as('totalCount')
var joinedTrafficStream = storeLBTrafficStream
|join(sharedLBTrafficStream)
.as('storelb', 'sharedlb')
joinedTrafficStream
|alert()
.warn(lambda: int("storelb.totalCount") > 50000 OR int("sharedlb.totalCount") > 50000 OR int("storelb.totalCount") + int("sharedlb.totalCount") > 50000)
.crit(lambda: int("storelb.totalCount") > 75000 OR int("sharedlb.totalCount") > 75000 OR int("storelb.totalCount") + int("sharedlb.totalCount") > 75000)
.message('Production IP {{ index .Tags "ip" }} has exceeded request thresholds for the Store ID {{ index .Tags "id" }} via the Shared LB: {{ index .Fields "sharedlb.totalCount" }} and the Store LB: {{ index .Fields "storelb.totalCount" }} within the last 2 minutes.')
.stateChangesOnly(5m)
.exec('/usr/bin/kapacitor_pubsub_stdin_invoker.sh')
.slack()
.channel('#ops-noise')
Which combines the alert nodes into one -- I had them broken out purely because I wanted recoveries to goto our pubsub channel but not to slack. This reduces the memory growth a bit. The bandaid I'll apply to this is service restarts at memory thresholds via systemd, I think.
That said, getting the barrier emission through the join node, the memory leak with join, etc... released should help.
The current iteration drops the usage of default node and the explicit type versions required because of it, or the combination of it and the sum operations. It's pretty clear, though, that we're not able to remove or reduce our cardinality and define the TTL on the points as they flow through the system, really, in this case, making the system far less useful. That has, fortunately, been mitigated as suggested in a number of forums by forcing restarts at high water marks for memory.
dbrp "toptraffic"."autogen"
var trafficByIPAndStoreIDStream = stream
|from()
.groupBy('ip', 'id')
.measurement('ips_and_stores')
|barrier()
.period(121s)
// the period for the barrier should be +1 unit over the period defined in the downstream window
.delete(TRUE)
|window()
.period(120s)
.every(5s)
var storeLBTrafficStream = trafficByIPAndStoreIDStream
|where(lambda: "source" == 'storelb')
|sum('count')
// sum on count here as this operation drops all other fields from the point
.as('totalCount')
var sharedLBTrafficStream = trafficByIPAndStoreIDStream
|where(lambda: "source" == 'sharedlb')
|sum('count')
// sum on count here as this operation drops all other fields from the point
.as('totalCount')
var joinedTrafficStream = storeLBTrafficStream
|join(sharedLBTrafficStream)
// join the sum streams together with their tags, reminder that fields other than totalCount are dropped due to the upstream
.as('storelb', 'sharedlb')
.tolerance(1s)
joinedTrafficStream
|alert()
.warn(lambda: "storelb.totalCount" > 50000 OR "sharedlb.totalCount" > 50000 OR "storelb.totalCount" + "sharedlb.totalCount" > 50000)
.crit(lambda: "storelb.totalCount" > 80000 OR "sharedlb.totalCount" > 80000 OR "storelb.totalCount" + "sharedlb.totalCount" > 80000)
.message('Production IP {{ index .Tags "ip" }} has exceeded request thresholds for the Store ID {{ index .Tags "id" }} via the Shared LB: {{ index .Fields "sharedlb.totalCount" }} and the Store LB: {{ index .Fields "storelb.totalCount" }} within the last 2 minutes.')
.stateChangesOnly(5m)
.slack()
.channel('#ops-noise')
.exec('/usr/bin/kapacitor_pubsub_stdin_invoker.sh')
with a dot graph of:
digraph ips_and_stores {
graph [throughput="16786.69 points/s"];
stream0 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
stream0 -> from1 [processed="128532482"];
from1 [avg_exec_time_ns="26.78µs" errors="0" working_cardinality="0" ];
from1 -> barrier2 [processed="128532482"];
barrier2 [avg_exec_time_ns="67.601µs" errors="0" working_cardinality="238931" ];
barrier2 -> window3 [processed="128532108"];
window3 [avg_exec_time_ns="93.931µs" errors="0" working_cardinality="238931" ];
window3 -> where6 [processed="33450748"];
window3 -> where4 [processed="33450748"];
where6 [avg_exec_time_ns="38.962µs" errors="0" working_cardinality="60604" ];
where6 -> sum7 [processed="33450748"];
sum7 [avg_exec_time_ns="46.81µs" errors="0" working_cardinality="60606" ];
sum7 -> join9 [processed="33450748"];
where4 [avg_exec_time_ns="43.106µs" errors="0" working_cardinality="60604" ];
where4 -> sum5 [processed="33450748"];
sum5 [avg_exec_time_ns="46.444µs" errors="0" working_cardinality="60604" ];
sum5 -> join9 [processed="33450748"];
join9 [avg_exec_time_ns="117.824µs" errors="0" working_cardinality="6803685" ];
join9 -> alert10 [processed="33450745"];
alert10 [alerts_inhibited="0" alerts_triggered="0" avg_exec_time_ns="51.397µs" crits_triggered="0" errors="0" infos_triggered="0" oks_triggered="0" warns_triggered="0" working_cardinality="6803683" ];
}
...showing high cardinality from the join, join9
, forward. With the throughput rates shown in the dot graph our kapacitor instances need restarting about every 1 hour and 50 minutes with a 45GB memory limit set on the process defined by MemoryMax=45G
on the systemd unit file.
I'm also seeing a similar behavior using the latest release 1.7.1
here's a top of the heap hprof:
Showing nodes accounting for 9203.41MB, 97.93% of 9398.23MB total
Dropped 243 nodes (cum <= 46.99MB)
Showing top 10 nodes out of 40
flat flat% sum% cum cum%
3537.67MB 37.64% 37.64% 3537.67MB 37.64% github.com/influxdata/influxdb/models.Tags.Map (inline)
3410.08MB 36.28% 73.93% 3410.58MB 36.29% github.com/influxdata/influxdb/models.(*point).unmarshalBinary
1166.16MB 12.41% 86.33% 1166.16MB 12.41% github.com/influxdata/kapacitor/edge.(*pointMessage).ShallowCopy
448.08MB 4.77% 91.10% 448.08MB 4.77% github.com/influxdata/kapacitor/alert.newHandler
246.01MB 2.62% 93.72% 246.01MB 2.62% strings.(*Builder).grow (inline)
200.91MB 2.14% 95.86% 200.91MB 2.14% github.com/influxdata/kapacitor.(*windowTimeBuffer).insert
194.50MB 2.07% 97.93% 7157.76MB 76.16% github.com/influxdata/kapacitor.(*TaskMaster).WritePoints
0 0% 97.93% 3410.58MB 36.29% github.com/influxdata/influxdb/models.(*point).Fields
0 0% 97.93% 456.60MB 4.86% github.com/influxdata/kapacitor.(*AlertNode).runAlert
0 0% 97.93% 1415.18MB 15.06% github.com/influxdata/kapacitor.(*FromNode).Point
I'm using a lot of stream queries based on this example
WritePoints
seems to be causing the problem, where is it used?
File: kapacitord
Build ID: 7cdd357954d1cbca82a4f08b6fcbce65e372a1fe
Type: inuse_space
Time: Nov 30, 2023 at 7:06am (EST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 17.23GB, 98.79% of 17.44GB total
Dropped 232 nodes (cum <= 0.09GB)
Showing top 10 nodes out of 41
flat flat% sum% cum cum%
6.84GB 39.20% 39.20% 6.84GB 39.20% github.com/influxdata/influxdb/models.Tags.Map (inline)
6.48GB 37.18% 76.38% 6.49GB 37.18% github.com/influxdata/influxdb/models.(*point).unmarshalBinary
2.24GB 12.84% 89.21% 2.24GB 12.84% github.com/influxdata/kapacitor/edge.(*pointMessage).ShallowCopy
0.50GB 2.84% 92.05% 0.50GB 2.84% strings.(*Builder).grow
0.44GB 2.54% 94.59% 0.44GB 2.54% github.com/influxdata/kapacitor/alert.newHandler
0.37GB 2.10% 96.69% 13.74GB 78.80% github.com/influxdata/kapacitor.(*TaskMaster).WritePoints
0.24GB 1.40% 98.09% 0.24GB 1.40% github.com/influxdata/kapacitor.(*windowTimeBuffer).insert
0.12GB 0.7% 98.79% 0.12GB 0.7% io.ReadAll
0 0% 98.79% 6.49GB 37.18% github.com/influxdata/influxdb/models.(*point).Fields
0 0% 98.79% 0.45GB 2.57% github.com/influxdata/kapacitor.(*AlertNode).runAlert
Overview
I'm trying to leverage Kapacitor 1.5.7 on Linux/amd64 for context aware traffic alerting in our multi-tenant commerce system at our ingress points for traffic spikes, etc... Typically this means data collection on two (optionally three -- in this example the data is sent but not grouped) fields:
We collect and report on the data in 1-2 minute windows and don't care about any data outside such window. That is, if the point in question is over 2 minutes old, it should be expired and expunged. Kapacitor is a standalone Influx product for this use case -- there is no InfluxDB instance for this data, no retention policies, etc... Data is transmitted to Kapacitor via the UDP listener.
The format of the message is the following:
As shown in singled-out stream stats later detailed in this issue, the cardinality of each of the aforementioned fields is roughly:
uri
- unknown, mediumid
- 20-30k within a minute window, avgip
- 40-60k within a minute window, avgrole
- at most 3The
count
parameter exists so that we can run a mathematical sum operation on the data in the pipe. However, this is redundant because each request generates its own message, its own point, always.We found Kapacitor struggling with unbounded memory growth in our Production systems, something we did not observe in other (non live traffic) environments. Our initial response to these uncontrollable runaway memory situations was to examine and reduce the cardinality of sets, particularly group by operations on streams. We initially tried reporting on the IP address, the Store ID, and the URI. These are all relatively high cardinality fields, but putting them together in an ordered group by wasn't helping with efficient and unbounded use of memory. So, we paired things back to the following tick script where the
uri
is dropped from the equation:The script is straight forward enough; we group on the stream by
ip
, thenid
, from thecombined
measurement. A barrier exists to delete data after one minute. These operations are assigned to a stream variable which is used in alerting to do different things (at the same threshold).The dot graph and sample output of that tick script while running renders:
As previously mentioned, what we saw with this is that over time (pretty quickly) we ran out of memory. The following graph shows various tweaks to the aforementioned script changing things like the window and barrier periods didn't seem to make any difference to how fast the script/pipeline/Kapacitor consumed memory.
The various spikes in memory show me altering the tick script, removing the window, removing the barrier, changing the barrier from idle to period, changing the time of the barrier tick / window, etc... During these iterations I collected data. The data below is from engagement of the aforementioned tick script, with only changes to the window and barrier periods.
Heap dumps show the following for
in use objects
:and for
in use space
:A profile dump at roughly the same time shows:
Perplexed I decided to chop things up and create two tick scripts instead that monitor each of those metrics independently. The first,
top_ips
does no variable assignment in the tick script and things are piped together in a single flow. The second,top_stores
has assignment and piping such that data streams to two alerts that do slightly different things with those triggers, like the aforementioned combined script.Data to the measurement
ips
looks like:Here's the show output for
top_ips
:... and for top stores the data looks like:
with a evaluated script like:
Note the cardinality of these, at least at the time sampled, was ~12K for store IDs and ~34k for IPs. These on their own seem small potatoes, and, even in the combined script where a group by splits first by the IP, then the Store, shouldn't be too much data for a one or two minute window.
At first this seemed to be a more stable approach, memory didn't seem to grow as fast and I thought we'd level off. Unfortunately, as the graph shows below, we did not.
Heap dumps show the following for
in use objects
:and for
in use space
:A profile dump at roughly the same time shows:
So I'm left head scratching of running through many iterations of changes to the tick scripts. The data, while constantly flowing the Kapacitor doesn't cause memory budge from about a gigabyte if all the tick scripts are disabled / inhibited from processing.
A few questions:
sum
doesn't really seem to be anywhere near the problem with this configuration and the growth issue, is there a cleaner way of identifying the count of points in the stream? Is it possible for me to drop the valuecount=1
given that's the only purpose it serves?I've attached (top_combined_and_top_ip_and_store_kapacitor_1.5.7_growth.tar.gz) the various dumps / profile captures, etc... from the time each of these steps were in place, an output of the structure is:
With the files in
top_ip_and_store_id
andtop_ip_and_store_id_last
taken a few hours apart as the ^^^ shows.