awslabs / kinesis-kafka-connector

kinesis-kafka-connector is connector based on Kafka Connect to publish messages to Amazon Kinesis streams or Amazon Kinesis Firehose.
Apache License 2.0
153 stars 91 forks source link

This connector is prone to data loss as well as OutOfMemoryError #2

Open behrangsa opened 7 years ago

behrangsa commented 7 years ago

Is the Kinesis Stream Sink Connector for demo purposes or is it intended to be production-ready? From what I can see it is far from ready for production use. I suggest adding a note at the top of the README file and warning users that the connectors are not yet ready for the prime time. In particular, the current version is prone to data loss as well as OOM errors.

flush is async and that can cause data loss

The current code looks like this:

    @Override
    public void flush(Map<TopicPartition, OffsetAndMetadata> arg0) {
        // TODO Auto-generated method stub
        kinesisProducer.flush();
    }

It should instead use kinesisProducer.flushSync() otherwise Kafka Connect can not catch potential errors that can occur during flushing and will incorrectly assume that flushing is always 100% successful. Having said that, using kinesisProducer.flushSync() alone cannot ensure correct flushing of offsets as in KPL addUserRecord is asynchronous and the exceptions thrown from this method will be handled by a separate thread.

This connector is prone to OOM errors:

To reproduce OOM errors:

  1. Create a Kafka topic with a few million records, each about 1KB
  2. Create a Kinesis Stream (with 1, 2 or even 10 shards)
  3. Start the connector

Over time the number of outstanding records will increase and it will blow up the server. Coincidentally, using kinesisProducer.flushSync() for flushing and a short Kafka offset flushing interval could prevent OOM errors.


To summarize, this connector has the following issues:

  1. It does not handle KPL exceptions correctly
  2. It does not implement the flush method correctly
  3. It could lead to OOM errors when the Kafka server has millions of unprocessed records that need to be exported to Kinesis
nehalmehta commented 7 years ago

@behrangsa we appreciate your feedback.

1) As you may have noticed currently Kafka Connect Framework does not provide mechanism to start connector from particular offset and as a result it always starts processing data from first available offset of topic. Now if we have retention period set to 30 days in Kafka, when we start Kafka Connect it will try to process all 30 days worth of messages using connector against Kinesis Streams. I understand you ma Other customers have solved this by one of following: 1) Remove older messages from Kafka if they are already processed by consumers and not important at this point, so Kafka Connect starts processing messages from that particular day. Or 2) Increase number of instances for Kafka Connect, and Kinesis Streams shards, let it process the data and scale it back. 2) Thanks for recommending kinesisProducer.flushSync() and you have valid concern that Kafka Connect will not catch potential error if any. I will try out if it has performance hit on normal processing and get back to you. 3) Can you please provide more information on KPL exceptions?

Thanks, Nehal

jessecollier commented 7 years ago

@nehalmehta This is what we see quite often.

If we were to change it from async to sync as @behrangsa has suggested, do you foresee any issues? It seems to make a lot of sense to wait for KPL to produce to kinesis before moving forward in the kafka log.

jessecollier commented 7 years ago

I also found this in the documentation:

https://docs.confluent.io/current/connect/devguide.html

The flush() method is used during the offset commit process, which allows tasks to recover from failures and resume from a safe point such that no events will be missed. The method should push any outstanding data to the destination system and then block until the write has been acknowledged.

Suggesting that the design is meant to be blocking and not async. I'll begin testing the change on our fork.

jessecollier commented 7 years ago

Just ran this with flushSync() and 1 worker with 15 connectors and each connector had 3 tasks. I am running in distributed mode.

I produced almost 2 million messages in kafka, confirmed the consumer offset lag. The single worker seemed to process with no issues.

Not able to produce a single OOM Java Heap.

I ran 2 more workers (while the 1st is running) and they joined the cluster and began spinning up tasks. I could see this with the groups joining/leaving.

After that, I began to see many, many Java Heap OOMs. It was quite bizarre. I could see the logs where the exception is caught: https://github.com/apache/kafka/blob/11afff09908035166febf9b75c410112693ff98c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L171-L175 and also another NullPointerException here: https://github.com/apache/kafka/blob/11afff09908035166febf9b75c410112693ff98c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L150-L151

You can see in the logs.

[2017-10-19 20:43:09,766] ERROR Task kinesis-kafka-connector-group_events-1 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.OutOfMemoryError: Java heap space
[2017-10-19 20:43:09,766] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2017-10-19 20:43:09,766] ERROR Task kinesis-kafka-connector-group_events-1 threw an uncaught and unrecoverable exception during shutdown (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.NullPointerException
    at com.amazon.kinesis.kafka.AmazonKinesisSinkTask.stop(AmazonKinesisSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:127)
    at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:121)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    ...

The odd thing is that other tasks continued to work "uninterrupted".

nehalmehta commented 7 years ago

Hi @jessecollier,

Just using flushSync instead of flush might have some challenges -- When we are using flushSync it attempts to flush every 500 ms till all records are flushed. So if Kinesis Streams are throttled because load has increased, it will keep on attempting flush till all records are successfully submitted to Kinesis Streams. Though if it takes 30s+ (default consumer session time out for Kafka) Kafka Consumer will time out and may cause partition rebalancing. Partition rebalancing can be expensive as it will stop all other tasks from making progress. Multiple partition rebalancing in quick succession will not allow new connectors to join. If flushSync happen multiple times and takes longer, it may reduce performance. At the same time I agree there is strong use-case for flushSync but before that I would like to validate configurations and understand OOM.

I think below changes along should help in your OOM:

1) Override Open method from SinkTask and instantiate KinesisProducer instead of doing it in Start ( kinesisProducer = getKinesisProducer();)

2) Override Close method from SinkTask and destroy KinesisProducer instead of closing in Stop ( kinesisProducer.destroy();)

Currently my test cluster does not have heavy load and so cannot re-create your issue. I will try to re-create this week and try above solution.

Thanks, Nehal

binary132 commented 5 years ago

Hey folks -- it looks like this is still an unresolved issue. Has sync support been added yet? Is there any plan for this? Without delivery guarantees, this isn't production-worthy.

skidder commented 4 years ago

It looks as though this may have been resolved in Dec 2017 by the addition of sync support via https://github.com/awslabs/kinesis-kafka-connector/commit/0ca4735d80f7fd7ad6f1991f34860e100758cbc7. Can anyone confirm?

bdesert commented 4 years ago

@jessecollier this is an old issue, and as @skidder mentioned, looks like https://github.com/awslabs/kinesis-kafka-connector/commit/0ca4735d80f7fd7ad6f1991f34860e100758cbc7 addresses this issue. Have you tried more recent versions? can we close this issue?