awslabs / amazon-kinesis-producer

Amazon Kinesis Producer Library
Apache License 2.0
397 stars 330 forks source link

KPL native binaries consume all memory => crash #21

Open matej-prokop opened 9 years ago

matej-prokop commented 9 years ago

We updated our java application that sends data to Kinesis in order to use KPL (0.10.0) last week. Our application has crashed 3 times since the update. It crashes because it runs out of memory.

Memory is not exhausted by java application directly. It is consumed by external process that is spawned by KPL. It seems that KPL uses some native library that does not properly manage memory.

image


Env: Debian 7; Java 8.u51; KPL version 0.10.0

matej-prokop commented 9 years ago

Our KPL configuration:

        config.setMetricsLevel("summary");
        // we limit our-self to 90% of throughput of one shard (to prevent throttling)
        config.setRateLimit(90);
        // if message is not transferred within 1 hour -> giving up
        config.setRecordTtl(1000 * 60 * 60);
        config.setRequestTimeout(30000);
        // we are not real time critical -> improve messages merging 
        config.setRecordMaxBufferedTime(1000);

We use KPL to connect to just one stream (us-west-1 region) with only one shard. Messages are inserted to KPL using method

public ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, ByteBuffer data)

As partitionKey we use aprox. 10 distinct strings based on data source. Including screenshot from CloudWatch to help you imagine our throughput (which is quite small in fact):

screen shot 2015-08-19 at 15 11 41

matej-prokop commented 9 years ago

Uploading chart showing amount of consumed memory over time. Three spikes (very beginning, 18.8 and 19.8) are point in time when system ran out of memory and OS killed our process. There is docker that restarted container and application started again. You can see that amount of consumed memory didn't growth constantly; however, it started "randomly" at some point in time.

image

kevincdeng commented 9 years ago

Hi Matej,

Thanks for the information. I see you have the KPL CloudWatch metrics enabled. Can you contact me via email (on my profile) with your AWS account ID? This will allow me to look at those metrics. Thanks!

andrepintorj commented 9 years ago

This seems somehow related to issue #12

matej-prokop commented 9 years ago

Hi Kevin,

I have sent you an email with my AWS account ID.

matej-prokop commented 9 years ago

I have updated to KPL 0.10.1; nevertheless, this issue persists.

kevincdeng commented 9 years ago

Hi Matej,

Are you able to see the KPL metrics in CloudWatch? I'm not able to find them from my side. I'm interested in correlating some of the metrics there with the graph you posted earlier. It will also be helpful if you can set the MetricsLevel config to "detailed" to get more metrics.

Also, is your app running in EC2?

matej-prokop commented 9 years ago

We are not running in EC2.

I am sorry, but I am force to stop using KPL 0.10.x for now. This is uncomfortable situation for me. KPL crashes every day now and I loose some production data with every crash because entire node is restarted.

kevincdeng commented 9 years ago

Does your code use getOutstandingRecordsCount() to check for back pressure? KinesisProducer has an unbounded queue, so if you enqueue records faster than they can be sent then memory usage will increase without bound.

matej-prokop commented 9 years ago

Well, it should report getOutstandingRecordsCount() to our monitoring system. Let me check that.

kevincdeng commented 9 years ago

The KPL emits a CloudWatch metric called UserRecordsPending, but you'll need MetricsLevel set to "detailed".

matej-prokop commented 9 years ago

Attaching screenshot showing chart of getOutstandingRecordsCount() result over time. 1 on Y axis is 1000 pending. I selected same time period as on chart of consumed memory I posted 5 days ago.

There you can see that number of pending messages increased about time of out of memory crash. Again there you can see three crashes. Nevertheless, even in its spike there is only 500 or 2000 records pending. Node that is crashing has 2GB of memory.

matej-prokop commented 9 years ago

stats_server_buffer

kevincdeng commented 9 years ago

Did your application produce records at a higher rate than usual during those times?

The graph appears to only have hourly data, do you have higher resolution data in and around the spikes?

One thing you can try is to add some code to back off from calling addUserRecord() when the value of getOutstandingRecordsCount() is high. Can you briefly describe your data source? E.g. is it putting records in response to incoming requests? Or maybe tailing a file?

matej-prokop commented 9 years ago

We have minutely sampled data, but only for a last few days. Therefore, I am uploading other 2 screenshots. It is from 23th of August. Our server crashed because of out of memory error in 21:13 on 23th of August.

OutstandingRecordsCount: stats_server_kinesis_3

Consumed memory chart: stats_server_memory

I don't think that memory is full of messages waiting for transfer by KPL. In this case, there were only about 100 message pending.

matej-prokop commented 9 years ago

Basically, we put record to Kinesis based on incoming request into our cloud service. I am not aware of increased activity around the spikes. Moreover, 23th of August was Sunday and we have lower traffic over weekend.

andrepintorj commented 9 years ago

Hi Kevin, we are in the same situation here as @mythge, is there any other way to generate debug data? We rolled back our production to 0.9.0 since it seems to be the only stable release of KPL so far.

kevincdeng commented 9 years ago

Thanks Matej for the data, that's very useful. Since there appears to be no buildup of records, an internal memory leak is the natural suspect.

A heap profile will be useful for finding it. I will post detailed instructions in a bit, but the basic idea is to use valgrind's massif to profile the native process. The output will allow me to determine what is leaking and eating up the heap.

matej-prokop commented 9 years ago

OK. If you want me to profile heap usage of KPL's external process on our server, provide me with detailed instructions. We uses "standard" native binaries shipped within jar file. Server where it runs is standard Debian 7.

kevincdeng commented 9 years ago

Here are the instructions:

Please use version 0.10.1 of the KPL for this.

Install valgrind (Debian/Ubuntu):

sudo apt-get update && sudo apt-get install -y valgrind

(If you're on other Linux distros, use the apt-get equivalents, or you can compile valgrind from source.)

Launch your Java application and wait until the KPL is initialized, then terminate the application. This is to extract the native binaries into the temp folder.

Unless you've explicitly configured the temp folder in the KPL's config, you should see the native binary in /tmp:

ls -l /tmp/amazon-kinesis-producer-native-binaries/
total 47840
-rwxr--r-- 1 admin admin 48985382 Aug 26 19:04 kinesis_producer_f6bc9a5c956e00bfce09f27d30976c1a75d884e5

Create a script to wrap valgrind:

pushd /tmp/amazon-kinesis-producer-native-binaries
echo 'valgrind --tool=massif /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_f6bc9a5c956e00bfce09f27d30976c1a75d884e5 $@' > heap_profiler.sh
chmod +x heap_profiler.sh
popd

Modify your java application to add an extra KPL config to make it use the script:

KinesisProducerConfiguration config = new KinesisProducerConfiguration()
        .setNativeExecutable("/tmp/amazon-kinesis-producer-native-binaries/heap_profiler.sh");

Run your application again. You should see a log message indicating that you are using the script:

[com.amazonaws.services.kinesis.producer.sample.SampleProducer.main()] WARN com.amazonaws.services.kinesis.producer.KinesisProducer - Using non-default native binary at /tmp/amazon-kinesis-producer-native-binaries/heap_profiler.sh

When the application terminates, you should notice a file called massif.out.{process id} in the directory from which you started the application. This is the heap profile.

Leave the application running for as long as necessary, but you need to terminate it at some point for the profile to be written out. Share the massif.out file with me (either via email or some other means) so I can look at it.

Note that valgrind greatly reduces the performance of the process being profiled, so you may have to reduce the amount of load that's hitting the application.

Thanks for taking the time to do this, we really appreciate your help!

matej-prokop commented 9 years ago

Thanks. I will try to do it.

Nevertheless, your statement "Leave the application running for as long as necessary" is a bit tricky :) Don't you think so? I guess that it won't write any profile if it will be stopped abnormally (kill by OS). On the other hand, it will be ideal to write out the profile as close as possible to crash. Looking at historical data, app can normally runs for hours. For how long should I leave my application running with heap profiler enabled? Please don't tell me that I should actively monitor usage of node memory and stop it once it will be close to out-of-memory error ;-)

pkopac commented 9 years ago

Hi! I work for the same company as Matej.

Do you know how we could do this, but force massif to print out data at set intervals? I am afraid restarting of the docker container will just kill everything in it and the tool won't have the opportunity to write to hdd.

We want to try it, but optimally just once, so we lose a bit of data just once. Programming a redundancy mechanism would take too much time.

Thanks

statusfailed commented 8 years ago

Not sure if this will help, but I've had a very similar problem with a different binary. I'd against a newer version of glibc than was running in production. Once run against the older version of glibc, it would just allocate memory until the machine crashed. That said, it happened much faster than your issue (typically using ~ 2GB within 5 minutes)

Does running your code against a newer version of glibc help?

kevincdeng commented 8 years ago

@statusfailed could you post which versions those where?

statusfailed commented 8 years ago

@kevincdeng I'm sorry, this was at a job I've left, so I can't check. However, I believe I was building on an updated Xubuntu 14.04 and running on an up-to-date Debian Squeeze. I'm not sure what versions of glibc that would be, but it was around May/June 2015. Apologies for the vagueness!

ihorkhavkin commented 8 years ago

Any update on this one?

We are using KPL as part of Kinesis output plugin in Logstash and noticed similar issue. I wonder would it make sense to re-build with KPL 0.9 and compare?

bdarfler commented 8 years ago

Downgrading was recommend to us by an AWS employee. Not sure if that is the official recommendation but I would certainly give it a try.

ihorkhavkin commented 8 years ago

After downgrading to 0.9.0 it works for me fine. I will update if there is any sign of instability with 0.9.0, but looks like 0.10.[01] have stability issues.

samcday commented 8 years ago

This issue is affecting us as well. Downgrading to 0.9 is not particularly easy for us, since we're relying on being able to specify a credentials provider (we're using STS and IAM roles).

Is there any ETA to fix the 0.10 version series?

kevincdeng commented 8 years ago

We are still trying to root cause this, so unfortunately I don't have an ETA right now. I will let you guys know once we have some results.

kevincdeng commented 8 years ago

@samcday could you give us some information about your setup when you saw the problem? This'll help us with repro.

  1. OS: name, version, kernel (uname -a), glibc (/lib/libc.so.6)
  2. Were you on EC2? Which instance type?
  3. Were you using docker? Which version
  4. How much data (MB/s) were you putting? How many records per second?
samcday commented 8 years ago

@kevincdeng heya!

  1. Running on Ubuntu 14.04.3. Linux ip-10-120-137-89 3.13.0-71-generic #114-Ubuntu SMP Tue Dec 1 02:34:22 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux. glibc: ldd (Ubuntu EGLIBC 2.19-0ubuntu6.6) 2.19
  2. Yes, running on EC2. Actually I was running on 4 t2.medium instances at the time I encountered this issue but I've since moved to c4.large.
  3. Yes, using Docker using the java:8 Docker image and running in Docker 1.8.3
  4. Attached some Cloudwatch metrics from KPL. If I'm reading the grafs correctly (I've been known to screw this up in a pretty spectacular and embarrassing fashion in the past) then at times we were pushing around 14k records per second at a throughput of 136Mbit/s.

image image

kevincdeng commented 8 years ago

Thanks @samcday. One more question: how long did it run for before it crashed?

samcday commented 8 years ago

That's a tricky one :) The service that runs KPL is in a health-checked autoscaling group. The instances were crashing quite frequently and repetitively, but still doing their best to soldier on as we cycled instances out over and over.

I mentioned in our AWS support case that we added a backpressure implementation that would force a flush on the producer once every 60 seconds. I've realised that if we're writing 10k+ records a second we're effectively asking KPL to buffer up to 600,000 records a minute, which is rather a lot.

Our workload actually looks like this: Kinesis stream --> KCL --> RecordProcessor --> KPL --> Another Kinesis stream. Given that we get pretty big batches from KCL, I've just pushed a new build that forces a flushSync() every batch. We'll see if that combined with the move to c4.large instances helps matters.

kevincdeng commented 8 years ago

I recommend using a count based limiter rather than time if your data rate tends to vary.

This is code I'm currently using to try to repro the issue, the main idea is the blocking queue holding tokens.

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.GsonBuilder;

import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.val;
import lombok.extern.log4j.Log4j;

@Log4j
public class Test {
    @AllArgsConstructor
    private static class Callback implements FutureCallback<UserRecordResult> {
        Runnable func;

        @Override
        public void onFailure(Throwable t) {
            func.run();
            if (t instanceof UserRecordFailedException) {
                handleResult(((UserRecordFailedException) t).getResult());
            } else {
                log.error("Unexpected error", t);
            }
        }

        @Override
        public void onSuccess(UserRecordResult r) {
            func.run();
            handleResult(r);
        }

        private void handleResult(UserRecordResult r) {
            if (!r.isSuccessful()) {
                log.warn("Record failed:\n" + json(r));
            }
        }
    }

    @SneakyThrows
    public static void main(String[] args) {
        val kpl = new KinesisProducer(new KinesisProducerConfiguration()
            .setMetricsLevel("detailed"));

        val start = System.nanoTime();
        val lastTime = new AtomicLong(start);
        val lastCount = new AtomicLong(0);
        val counter = new AtomicLong(0);
        val tokens = new ArrayBlockingQueue<Boolean>(30_000);
        while (tokens.remainingCapacity() > 0) {
            tokens.offer(true);
        }
        val cb = new Callback(() -> {
            tokens.offer(true);
            counter.incrementAndGet();
        });

        val exec = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
            .setDaemon(true)
            .setNameFormat("callback-%d")
            .setUncaughtExceptionHandler((t, e) -> {
                log.error(String.format("Uncaught exception in thread %s (id %d)",
                    t.getName(), t.getId()), e);
            })
            .build());

        exec.submit(() -> {
            while(true) {
                sleep(Duration.ofSeconds(5));
                long dt = System.nanoTime() - lastTime.getAndSet(System.nanoTime());
                long c = counter.get();
                long lc = lastCount.getAndSet(c);
                long rate = Math.round(1e9 * (c - lc) / dt);
                long overallRate = Math.round(1e9 * c / (System.nanoTime() - start));
                log.info(String.format(
                    "%,d records put or failed; (%,d RPS tumbling window, %,d RPS overall)",
                    c, rate, overallRate));
            }
         });

        while (true) {
            tokens.take();
            byte[] data = UUID.randomUUID().toString().getBytes();
            val key = UUID.randomUUID().toString();
            val f = kpl.addUserRecord("s3", key, ByteBuffer.wrap(data));
            Futures.addCallback(f, cb, exec);
        }
    }

    @SneakyThrows
    private static void sleep(Duration d) {
        Thread.sleep(d.toMillis());
    }

    private static String json(Object o) {
        return new GsonBuilder().setPrettyPrinting().create().toJson(o);
    }
}
kevincdeng commented 8 years ago

After some experimentation I've discovered that in low CPU environments (e.g. t2.medium and c4.large), the KPL's network IO thread can become starved. This leads to data not being sent, which then leads to connections being dropped by the server.

As more records are enqueued into the KPL, the problem becomes worse and exacerbates itself as more and more retries are needed for the requests that failed from the dropped connections.

I ran the code above on a r4.large instance (2 cores), and saw many broken pipe and connection reset errors, as well as gaps in the network egress (periods with 0 egress).

Lowering the maximum number of outstanding records (the capacity of the ArrayBlockingQueue) to 2,000 (down from 30,000) fixes the problem without lowering the throughput, and also reduces the CPU usage.

On a larger instance (c4.2xlarge), having a limit of 30,000 worked fine, but consumed much more CPU. The throughout was however also many times higher.

samcday commented 8 years ago

Interesting find @kevincdeng.

We've noticed pretty flaky connectivity to Kinesis too in both KCL and KPL. Lots of connection resets and read timeouts. It was happening alot on the t2 family, but it's still happening to us on c3.large instances (albeit not so often). I'm going to try moving our workload to m3.xlarges and see how things look from there.

BTW we've been running with the aggressive flushSync() on every KCL batch for a few hours now and it's pretty stable. Still look like memory usage is creeping up though. I've attached a screenshot from our Datadog dashboard.

image

kevincdeng commented 8 years ago

I've got my test code from earlier running in a java:8 docker image on a t2.medium instance on ubuntu 14.04. I'm monitoring the memory consumption. I'll post an update tomorrow.

ubuntu@ip-172-16-1-153:~$ date
Fri Dec  4 02:22:29 UTC 2015
ubuntu@ip-172-16-1-153:~$ sudo docker stats --no-stream pensive_galileo
CONTAINER           CPU %               MEM USAGE/LIMIT     MEM %               NET I/O
pensive_galileo     57.40%              771.6 MB/4.144 GB   18.62%              24.47 MB/1.661 GB
kevincdeng commented 8 years ago

@samcday How are your metrics looking now? Did the memory consumption continue to increase?

samcday commented 8 years ago

We had to keep experimenting with the right instance mix to get things to stabilise (ended up with 5x c3.large), but now things look okay.

image

With KPL flushing to a downstream Kinesis stream though, we've noticed we occasionally block for anywhere up to 2-3 minutes (you can see the spikes in "Freshness" in the graf above). The spikes are annoying but not the end of the world. Maybe I should raise a separate issue? The spikes in processing delay correlate pretty directly to spikes in flushing data through KPL to the downstream Kinesis.

image

kevincdeng commented 8 years ago

Is the last graph for average? Do you have graphs for max and sample count too?

samcday commented 8 years ago

Wellllll. That metric is coming from our app via a Java statsd client --> statsd server --> Datadog. So all sorts of weird scaling and sampling is happening all over the place :P

There is a sample count and max though, but not sure how much I trust 'em.

image

alexandreyc commented 8 years ago

Hello,

Is this issue being fixed ? We're also experiencing some out of memory problems with KPL.

Thanks,

quiqua commented 7 years ago

We are using the KPL v0.12.1 in a AWS Lambda environment and it seems like it is running out of available memory after a certain amount of time / requests made.

I suspect the extraction of binaries to a temp folder each time the Lambda function gets invoked decreases the available memory and causes java.lang.OutOfMemoryError: Java heap space.

pfifer commented 7 years ago

@quiqua Do you have any limitation of the number of records you queue up to send to the KPL? It sort of sounds like a queue is filling up in the Java heap.

These memory issues are in the KPL native process which is entirely separate memory space.

SoujanyaRama commented 6 years ago

I am having similar issue. Is there a workaround or fix ? I am using 0.12.6 version of KPL.