Open alexmnyc opened 6 years ago
Are there any logs from the native process to try pinpoint why the process is hanging?
Anyone?
The KPL will log message to Java using SLF4J. The messages would appear to originate from LogInputStreamReader
. High CPU usually means the KPL is not able to make remote calls consistently. This can occur if it's being throttled, or there is a network issue increasing latency. The KPL has issues with throttling whether it's from Kinesis, or CloudWatch.
Additionally in the log output there will be messages showing the flush triggers which can indicate issues. If the records, or bytes fields are high that could indicate it's generating a large number of requests to Kinesis, which if there is latency could cause the KPL to backlog.
The issue does not seem to be related to throttling... Where does KPL native process buffer records?
The KPL buffers all records in memory while waiting to dispatch them. If you have GDB installed it's possible to get a stack trace using this command:
gstack <pid>
if that doesn't work you might be able to use gdb directly:
gdb -p <pid> -ex 'thread apply all bt' --batch
If there are a large number of threads, that would be the indicator that something is wrong. If you're using the thread pooling option the KPL may have a large number of threads, but they should normally be blocked at something like: std::condition_variable::wait
. If you see a number of threads that are active look to see if they're active in calls to Kinesis, or CloudWatch.
Maybe I can share some experience. We had a long history with KPL.
In older versions, we used to see 100% CPU when the outstanding record count in KPL surges due to either latency spike or under-configured outbound connections or under-provisioned CPU power.
Lately we ran into this again with 0.12.8 when we increase a stream's shard count from 24 to 48, but is still doing the default 24 connections and 24 threads on r4.large (2-core) instances.
For a different stream that has 144 shards, our m4.xl or m4.4xl instances with 48 or 96 connections can publish largely fine.
Btw, we use thread pool model and has set maxBufferTime to 1 second instead of the default 100ms.
Still, I think the 100% CPU is not a good place to be. Please investigate.
I imagine a way to reproduce this is to set up KPL to use a small number of connections to publish to a stream with many more shards with high traffic on a smaller instance.
@pfifer Was this issue already solved by https://github.com/awslabs/amazon-kinesis-producer/pull/193 ? I'm just wondering why the status of this issued was not updated.
@chang-chao that is one source of high CPU. #193 may have solved the problem for a lot of people, but letting the inflight message count get really high can still cause the KPL to use a lot of CPU.
@pfifer We're also experiencing problems with high CPU loads from KPL.
but letting the inflight message count get really high
How high is »really high«?
If you're using the thread pooling option the KPL may have a large number of threads
If I set the ThreadPoolSize
to 5, is it still possible to have a higher number of threads?
It's hard to say, that is what the processing_stats that the KPL emits tries to indicates. As the KPL gets more, and more backed up the average time to transmit records increases. Once it passes an internal threshold the KPL will begin to warn about the backlog. You can use this average time to try and tune how many records you are sending before backing off. Essentially once you're at the edge it doesn't take much to fall over. An increase in throttling or latency can cause the KPL to rapidly back up. The thread pooling model can mitigate the CPU, but at the cost of increasing the delay.
Which version of the KPL, and what OS are you using? What can also be useful for us is to get the stack trace from the native process by using gstack, or gdb as I posted above. Even better is if you can get a performance capture using perf
on Linux, and there are equivalents on macOS, and Windows.
@pfifer Thanks for the quick response! We're currently on Java KPL 0.12.5 and on a openjdk:8-jre-slim docker image. RecordTtl
is set to max so we don't lose any records, and we use getOutstandingRecordsCount()
for backpressuring: If the outstandingRecordsCount
is above a certain threshold (10,000 records, which is equivalent to 2MB), we stop sending more records and wait until it drops below the threshold again (see https://github.com/apache/flink/pull/6021). Limiting the ThreadPoolSize
from 10 (default) to 2 does not seem to have any effect.
I'll try tomorrow whether the problem still persists with 0.12.9, just wanted to know whether 10,000 is already considered »really high«.
I really wouldn't recommend setting the ThreadPoolSize
below 32ish. The threads are primarily IO bound, and not CPU bound. A bottleneck at sending could also increases the number of records waiting for processing, which could also cause an increase in memory and scanning.
0.12.9 changed some of the internal locking which, when under heavy load, could cause a lot of contention.
I would recommend not setting the RecordTtl to max, but instead allow it to expire so your application can see why it expired. After you get the expired response you can re-enqueue it.
The threads are primarily IO bound, and not CPU bound.
That's what I thought, hence I'm wondering why CPU is currently our biggest problem ;-)
I'm testing the backpressuring currently, so the limit is clearly the internal limiting of Kinesis to 1MB per second per shard, as intended. So the number of threads should not be the bottleneck when writing to two shards with two threads?
I would recommend not setting the RecordTtl to max, but instead allow it to expire so your application can see why it expired.
I don't see how this would change behavior whether KPL retries indefinitely or our application? In particular since the »why it expired« clearly is »because of rate limiting«?
The limit of 10,000 in-flight records before backing off was chosen for a record size of 200 Bytes, so that 1MB of records per shard can be in-flight at any time. Is this a sensible number, or am I doing something wrong here?
BTW, 0.12.9 does not change behavior substantially, nor does increasing the thread pool size from 2 to 32.
You can use this average time to try and tune how many records you are sending before backing off. Essentially once you're at the edge it doesn't take much to fall over. An increase in throttling or latency can cause the KPL to rapidly back up. The thread pooling model can mitigate the CPU, but at the cost of increasing the delay.
Delay is not a concern in our use case, only throughput is, which is currently limited by CPU. I had a look at the BufferingTime
metric, which averages at 500ms. Is that too much? What should I do to keep this time lower while maintaining maximum throughput?
That's what I thought, hence I'm wondering why CPU is currently our biggest problem ;-)
Before 0.12.9 many of the locks were based on spin locks which spun on the CPU.
I don't see how this would change behavior whether KPL retries indefinitely or our application? In particular since the »why it expired« clearly is »because of rate limiting«
Part of pulling the requests out removes them from the KPL is to figure out why the requests are failing. The KPL tries to prioritize early records before later records which can increase the amount of processing time the KPL must use when handling failed records. This might also be a cause of the high CPU.
Delay is not a concern in our use case, only throughput is, which is currently limited by CPU. I had a look at the
BufferingTime
metric, which averages at 500ms. Is that too much? What should I do to keep this time lower while maintaining maximum throughput?
In this case what I'm interested in is how long the actual send to Kinesis takes. Right now that is only available from the processing_statistics_logger. This time includes how long the request was waiting before it was actually sent by the http library, and how long the actual transmission took.
A second question: Are you sending to Kinesis in the same region your application is running in?
@pfifer
Maybe a bit more generic: I'm trying to send records to Kinesis using the Java KPL, and I want that
My current approach is to check getOutstandingRecordsCount()
each time before inserting a record, and if this goes above 10,000 (or 1,000, does not matter), I wait for some time (100...500 milliseconds) until the outstanding records count is below the limit again before sending the next record. The RateLimit
setting is at 100
. Apparently, this is the wrong way to do it.
How should I use the Java KPL, in particular, how should I back off correctly while maintaining maximum throughput? What should I do to find a good producer configuration to achieve this?
A second question: Are you sending to Kinesis in the same region your application is running in?
Yes.
processing_statistics_logger
Typical entry under high load (Backpressuring by keeping outstandingRecordsCount
below 10,000; CPU at 100%):
[processing_statistics_logger.cc:109] Stage 1 Triggers: { stream: 'xxx', manual: 0, count: 0, size: 137, matches: 0, timed: 469, UserRecords: 76818, KinesisRecords: 606 }
[processing_statistics_logger.cc:112] Stage 2 Triggers: { stream: 'xxx', manual: 0, count: 0, size: 0, matches: 0, timed: 472, KinesisRecords: 604, PutRecords: 472 }
[processing_statistics_logger.cc:127] (xxx) Average Processing Time: 17.104034 ms
Typical entry under low load (at ~20% of the shard capacity, CPU normal):
[processing_statistics_logger.cc:109] Stage 1 Triggers: { stream: 'xxx', manual: 0, count: 0, size: 0, matches: 0, timed: 487, UserRecords: 10381, KinesisRecords: 487 }
[processing_statistics_logger.cc:112] Stage 2 Triggers: { stream: 'xxx', manual: 0, count: 0, size: 0, matches: 0, timed: 328, KinesisRecords: 487, PutRecords: 328 }
[processing_statistics_logger.cc:127] (xxx) Average Processing Time: 15.113846 ms
@fmthoma That indicates the CPU usage isn't getting used while sending the records. It would be really great if you could capture some performance data from the KPL.
top -b -H -d 1 -n 20 -p <kinesis_producer PID>
Save the output from top to a file.gstack <kinesis_producer PID>
Save the stack traces to a file.gstack
doesn't work you might be able to use gdb directly:
gdb -p <pid> -ex 'thread apply all bt' --batch
These two of these together give us what thread is using the most CPU, and some stack traces from that thread.
@pfifer We ran some performance tests yesterday and found out the following:
RecordMaxBufferedTime
, while in the second scenario most records reach AggregationMaxSize
. This also reflects in the PutRecords.Records
metric, which is twice as high when writing with more producers.ThreadPoolSize
of 2 and 128, but the perf
traces show that a substantial amount of time is spent polling.perf
, about 20% of all samples are found in aws::metrics::Metric::put
, most of that in turn in the callstack of aws::kinesis::core::Retrier::emit_metrics
. That's somewhat unexpected. A lot of CPU time was also spent in aws::kinesis::core::Reducer<aws::kinesis::core::UserRecord, aws::kinesis::core::KinesisRecord>::add
with 15% of all samples.We created a Gist with the top
and gdb
traces, I hope that helps.
EDIT: Forgot to say, these tests were run on 0.12.9.
@pfifer Any news?
We ran some more tests. It seems like the CPU load seems to increase when outstandingRecordsCount
goes above 100...200 records. This is rather bad for us as our user records are about 200 bytes each, so we'd like to aggregate at least 100 of them into one kinesis record to use the full size of a PUT unit. But also at lower throughput, the CPU load seems unnecessarliy high. In particular, the KPL native process, which should be mostly IO-bound, produces about 90% of the CPU load, while the Java process that does all the CPU-heavy lifting in our application only produces 10% of the actual load.
We have very similar scenario to you @fmthoma, where can't lose any records, but can tolerate lateness
We're also using kpl 0.12.9 and are seeing 100% CPU under high load. Were you able to find any configuration that helped mitigate the issue?
@kiftio Yes and no.
As for the KPL, unfortunately we did not find a solution. We found one issue with our partition keys, they should be short and sparse b/c they are transmitted in a table along with the payload. Using long random partition keys, as we did in the beginning, adds a significant overhead, now we use 2-byte random numbers which is sufficient to balance records over 100+ shards, and the overhead is negligible. We also found out that running multiple producers in parallel with low throughput consumes less CPU (cumulatively) than running one producer at high load, but the difference is not that significant.
However, I can recommend to use the Kinesis Aggregation Library together with the plain AWS Kinesis HTTP client instead. CPU consumption is not an issue, you can use it at arbitrary parallelism, and it's synchronous (so backpressuring is trivial). Overall, we spent much less time on making this combination work, than on trying to figure out how to achieve backpressuring with the KPL, let alone all the debugging why CPU limits our throughput. You have to be aware of awslabs/kinesis-aggregation#11, but that's easy to work around once you know it.
I checked for WriteProvisionedThroughputExceeded and there are zero in Cloudwatch for that stream.
From c4.xlarge: