qubole / streamx

kafka-connect-s3 : Ingest data from Kafka to Object Stores(s3)
Apache License 2.0
96 stars 54 forks source link

OutOfMemory error from S3 transfer manager #30

Open dhamilton opened 7 years ago

dhamilton commented 7 years ago

Hi, we've been seeing issues where the number of threads open by the plugin seems to be increasing unbounded and the connector workers all eventually die with stack traces pointing to failure to create threads in the S3 transfer manager. We do see data being uploaded to S3 but I'm wondering if this might be a case where we are consuming faster than we can upload. Has this been observed by anyone else in the past and is there a known workaround? Thanks.

[2017-01-16 18:30:15,554] ERROR Task pixel_events_s3-3 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142) java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:713) at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:949) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1360) at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:132) at com.amazonaws.services.s3.transfer.internal.UploadMonitor.(UploadMonitor.java:129) at com.amazonaws.services.s3.transfer.TransferManager.upload(TransferManager.java:449) at com.amazonaws.services.s3.transfer.TransferManager.upload(TransferManager.java:382) at org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:127) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:376) at io.confluent.connect.hdfs.avro.AvroRecordWriterProvider$1.close(AvroRecordWriterProvider.java:69) at io.confluent.connect.hdfs.TopicPartitionWriter.closeTempFile(TopicPartitionWriter.java:514) at io.confluent.connect.hdfs.TopicPartitionWriter.close(TopicPartitionWriter.java:319) at io.confluent.connect.hdfs.DataWriter.close(DataWriter.java:299) at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:110) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:301) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:432) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)

PraveenSeluka commented 7 years ago

Hi @dhamilton Thanks for reporting it. This seems to be caused due to a bug in s3a where it does not clean up threads, and instead goes on creating worker threads and eventually runs out. We just found this issue and am working on a fix (which is to upgrade AWS SDK)

Meanwhile, you could try using NativeS3FileSystem instead of S3A. I will have this fix soon and you could try it out.

Thanks Praveen

dhamilton commented 7 years ago

Awesome, thank you so much for the quick reply, we'll try the NativeS3FileSystem for now.

Also just so you're aware we also hit the following issue https://github.com/confluentinc/schema-registry/pull/363 in the schema registry client which blocked us from syncing multiple topics with the same schema. We rebuilt your plugin using confluent version 3.0.1 and kafka version 0.10.0.1 and this issue was corrected by that update.

PraveenSeluka commented 7 years ago

@dhamilton Can you try it out now. I have pushed a commit which upgrades the AWS SDK (and hadoop), and this should fix this thread leak issue. Please let us know if this helps. Thanks for using StreamX

Regarding confluent + kafka version upgrade, I will test it out

dhamilton commented 7 years ago

Sure, we'll test this out today and let you know what we find.

dhamilton commented 7 years ago

We're seeing the following issue when trying to resume the job. Have you seen this before?

[2017-01-17 20:21:35,124] ERROR Task pixel_events_s3-3 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142) java.lang.NullPointerException at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:110) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:301) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:432) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:146) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) [2017-01-17 20:21:35,124] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)

dhamilton commented 7 years ago

Just to follow up that exception does not occur when using the NativeS3FileSystem. I've seen it in the past using the S3FileSystem outside of this issue.

PraveenSeluka commented 7 years ago

This usually points to some wrong config related to s3a, check the access/secret keys and the file system impl configs. Task will try creating S3Storage and it fails when there is any hadoop/s3 issues and then it gets into closePartitions flow - which is not the real issue.

PraveenSeluka commented 7 years ago

@dhamilton I have been testing this and hitting number of issues with S3A. In my testing, I dont see any NativeS3FileSystem issues, so I suggest you to use that instead. (I will spend some more time to get check S3A more)

dhamilton commented 7 years ago

Yes we've been having much better luck with NativeS3FileSystem so we'll stick with that for now. Thanks for your help!

phillipamann commented 7 years ago

Thanks for this discussion. I have hit this error as well.

phillipamann commented 7 years ago

I am unable to use NativeS3FileSystem command. Can you share your setting for how you get this to work? I got this to work. I will see if I get OutOfMemory now.

PraveenSeluka commented 7 years ago

Here is the summary of this issue

Let me know if you find anything.

PraveenSeluka commented 7 years ago

I tried more to get this working.

There are two approaches

  1. Fix thread leak in aws-java-sdk 1.7.4 version and publish another artifact
  2. Change S3AFileSystem to work with the newer signatures of AWS SDK

I guess both of these are not straight forward. AWS SDK upgrade in hadoop is done, but this fix is available in hadoop 2.8 or 3.0 (not released yet). See https://issues.apache.org/jira/browse/HADOOP-12269. Will wait for it to become stable and then upgrade hadoop version, which will fix this issue.

Please use NativeS3FileSystem till then. Thanks