awslabs / amazon-kinesis-producer

Amazon Kinesis Producer Library
Apache License 2.0
399 stars 331 forks source link

addUserRecord call throws DaemonException #39

Closed heikkiv closed 5 years ago

heikkiv commented 8 years ago

Sometimes calling addUserRecord starts to throw:

com.amazonaws.services.kinesis.producer.DaemonException: The child process has been shutdown and can no longer accept messages.
    at com.amazonaws.services.kinesis.producer.Daemon.add(Daemon.java:171) ~[amazon-kinesis-producer-0.10.2.jar:na]
    at com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:467) ~[amazon-kinesis-producer-0.10.2.jar:na]
    at com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:338) ~[amazon-kinesis-producer-0.10.2.jar:na]

The KPL does not seems to recover from this. All further calls to addUserRecord also fail. Restarting the KPL java process fixes the situation.

This seems to happen when the kinesis stream is throttling requests so my guess is that the native process cant write to the stream quickly enough and runs out of memory. If that's the case my expectation would be that the native process should start to discard older data and of course that if the native process dies the KPL recovers to a working state.

antgustech commented 5 years ago

Same thing here. 12.11, no errors are thrown but no logs are created either on my S3 bucket:

     KinesisProducerConfiguration config = new KinesisProducerConfiguration()
                .setRecordMaxBufferedTime(3000)
                .setMaxConnections(5)
                .setRequestTimeout(60000)
                .setRegion("eu-west-1");

        final KinesisProducer kinesis = new KinesisProducer(config);

        System.out.println(kinesis.toString());
        ByteBuffer data = ByteBuffer.wrap(("Test main java").getBytes());
        kinesis.addUserRecord("myStream", "appStarts", data);
        System.out.println("Done");

But If I instead wait for callbacks like below, I ge the following errors:

        KinesisProducerConfiguration config = new KinesisProducerConfiguration()
                .setRecordMaxBufferedTime(3000)
                .setMaxConnections(5)
                .setRequestTimeout(60000)
                .setRegion("eu-west-1");

 KinesisProducer kinesis = new KinesisProducer(config);

         Thread.sleep(2000);
         FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {
             @Override
             public void onFailure(Throwable t) {
                 System.out.println("Failed: " + t.toString());
                 System.out.println(t.getStackTrace().toString());
                 t.printStackTrace();
             }

             @Override
             public void onSuccess(UserRecordResult result) {
                 System.out.println("Success: " + result.toString());
             }
         };

         for (int i = 0; i < 10; ++i) {
             ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
             ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data);
             // If the Future is complete by the time we call addCallback, the callback will be invoked immediately.
             Futures.addCallback(f, myCallback);
         }

         for (int i = 0; i < 5; i++) {
             try {
                 Thread.sleep(10000); //So I can wait and see the callbacks.
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }

         }
Failed: com.amazonaws.services.kinesis.producer.UserRecordFailedException
[Ljava.lang.StackTraceElement;@71d70b5e
com.amazonaws.services.kinesis.producer.UserRecordFailedException
    at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler.onPutRecordResult(KinesisProducer.java:197)
    at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler.access$000(KinesisProducer.java:131)
    at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler$1.run(KinesisProducer.java:138)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

I have tried older versions such as 10 but they cannot connect at all, I get connection errors.

cgpassante commented 5 years ago

I can reliably reproduce the error with a very short code snippet. Two points to note: the first request throws the exception, subsequent requests work. This behavior happens in a docker container running various opensdk images (amazoncorretto-8, alpine-opensdk-8, etc). It does not happen outside of docker on my laptop running Java Hotspot.

cgpassante commented 5 years ago

@Asamkhata071 https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-upgrades.html

MaximilianoFelice commented 5 years ago

I'm trying a really simple example and finding this issue with Scala 2.13.0, Java 1.8 and KPL 0.12.11:

object KinesisTest extends App {
  val kinesis = new KinesisProducer()

  val userRecordsFutures = (1 to 2) map { idx =>
    kinesis.addUserRecord("payments-stream-test", "partitionName", ByteBuffer.wrap("test".getBytes("UTF-8"))).get()
    println(s"Published idx ${idx}")
  }
}

Exception in thread "main" com.amazonaws.services.kinesis.producer.DaemonException: The child process has been shutdown and can no longer accept messages. at com.amazonaws.services.kinesis.producer.Daemon.add(Daemon.java:176) at com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:536) at com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:349) at com.example.KinesisTest$.$anonfun$userRecordsFutures$1(KinesisTest.scala:12) at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.scala:18) at scala.collection.immutable.Range.map(Range.scala:59) at com.example.KinesisTest$.delayedEndpoint$com$example$KinesisTest$1(KinesisTest.scala:11) at com.example.KinesisTest$delayedInit$body.apply(KinesisTest.scala:8) at scala.Function0.apply$mcV$sp(Function0.scala:39) at scala.Function0.apply$mcV$sp$(Function0.scala:39) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17) at scala.App.$anonfun$main$1(App.scala:75) at scala.App.$anonfun$main$1$adapted(App.scala:75) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574) at scala.collection.AbstractIterable.foreach(Iterable.scala:904) at scala.App.main(App.scala:75) at scala.App.main$(App.scala:73) at com.example.KinesisTest$.main(KinesisTest.scala:8) at com.example.KinesisTest.main(KinesisTest.scala)

Could anyone find a fix for this issue? This issue seems to be entirely reproducible and many people seem to have faced it. Is there any official roadmap for this tool to be fixed?

ChengzhiZhao commented 5 years ago

We are facing a similar issue on KPL 0.12.11, I am following this up if anyone has some ideas on how to get around of it and have tried to set DLQ for it. Thanks!

cgpassante commented 5 years ago

I faced this issue and amazon tech support helped me debug it. I don't know scala but the java version of the api returns a future from the adduserrecord method. By inspecting that return object, you can learn why the add failed. In my case it was because I forgot to add credentials to my container which prevented the KPL daemon from connecting to the queue. That can cause this exception to be thrown when you attempt to add a record.

Cory-Bradshaw commented 5 years ago

Hello,

Thank you everyone for sharing your experience and learning with the community. For an example of how to implement this, see the KPL sample application in this repository, specifically this line. (this is for a test application, and in this case, it just shuts down the sample application after displaying the underlying failure)

This is a general failure condition that occurs when there is any unresolvable configuration problem with the KPL. Usually when this happens it is for one for the following reasons:

  1. Credentials could not be found
  2. Lack of permissions for resources
  3. Stream does not exist
  4. Targeting wrong region (leading to Stream does not exist)

If you are experiencing this problem and can confirm that it is not due to a configuration/access, please re-open the issue and provide more details on your configuration and if any reproduction steps are consistently successful, including steps about stream creation, iam users/roles/permissions, container/ec2 instance, etc.

For additional assistance, you may also open a customer support ticket through the AWS console to receive more specific support.

namedgraph commented 5 years ago

I followed the "Barebones Producer Code" and got this exception when calling kinesis.addUserRecord(). I tried adding callback after that, but the code does not reach it due to the exception.

The same setup worked using KCL (1.x). I will be trying KCL 2.x now.

Cory-Bradshaw commented 5 years ago

@namedgraph Try removing the loop from the sample code and write a single record with the callback above. If this doesn't work, please respond with more details. (Full code, IAM permissions used, etc).

I'm not sure why you are concerned about KCL version here. There aren't any version dependancies between KPL and KCL.

wikier commented 5 years ago

Any update on this? I've just ran into similar issues...

Is the official recommendation to move out of KPL and adopt the SDK directly?

Cory-Bradshaw commented 5 years ago

Repeating from above:

Hello,

Thank you everyone for sharing your experience and learning with the community. For an example of how to implement this, see the KPL sample application in this repository, specifically this line. (this is for a test application, and in this case, it just shuts down the sample application after displaying the underlying failure)

This is a general failure condition that occurs when there is any unresolvable configuration problem with the KPL. Usually when this happens it is for one for the following reasons:

Credentials could not be found Lack of permissions for resources Stream does not exist Targeting wrong region (leading to Stream does not exist) If you are experiencing this problem and can confirm that it is not due to a configuration/access, please re-open the issue and provide more details on your configuration and if any reproduction steps are consistently successful, including steps about stream creation, iam users/roles/permissions, container/ec2 instance, etc.

For additional assistance, you may also open a customer support ticket through the AWS console to receive more specific support.

@wikier ,

Additionally, this can happen if the KPL process gets overwhelmed by a lack of backpressure successfully implemented by the customer. I highly recommend reading this blog post to understand some of the considerations for how to configure and use the KPL:

https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/

srinihacks commented 5 years ago

We are facing this issue when we write a high volume of data to the stream. I don't think this issue is related to the configuration and access issues.

Credentials could not be found Lack of permissions for resources Stream does not exist Targeting the wrong region (leading to Stream does not exist)

We have also implemented backpressure and specifically flushing the records when outstanding records reach the maximum threshold configured by the application. Whenever this error occurs CPU usage goes high.

Already there is AWS support ticket to address this issue.

fmthoma commented 5 years ago

@srinihacks The high CPU problem is also known, see #187. It's the reason why we moved away from KPL in the end and towards Kinesis Aggregation Library + Kinesis Client. This did simplify a lot of things for us.

mkemaldurmus commented 2 years ago
ERROR c.a.s.k.producer.KinesisProducer - Error in child process
java.lang.RuntimeException: Child process exited with code 130
        at com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:533)
        at com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:509)
        at com.amazonaws.services.kinesis.producer.Daemon.startChildProcess(Daemon.java:487)
        at com.amazonaws.services.kinesis.producer.Daemon.access$100(Daemon.java:63)
        at com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon.java:133)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
15:39:49.105 [kpl-daemon-0000] ERROR c.a.s.k.producer.KinesisProducer - Error in child process
java.lang.RuntimeException: Child process exited with code 130
        at com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:533)
        at com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:509)
        at com.amazonaws.services.kinesis.producer.Daemon.startChildProcess(Daemon.java:487)
        at com.amazonaws.services.kinesis.producer.Daemon.access$100(Daemon.java:63)
        at com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon.java:133)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

I am using m1 macbook and java 8 for m1. Can you any advice?