conduktor / kafka-security-manager

Manage your Kafka ACL at scale
https://hub.docker.com/r/simplesteph/kafka-security-manager
MIT License
361 stars 159 forks source link

Error while reading file from S3 bucket #77

Closed wajahat-nclouds closed 4 years ago

wajahat-nclouds commented 4 years ago

I am using com.github.simplesteph.ksm.source.S3SourceAcl source class to read CSV file from the bucket but I am getting the following exception

Exception

WARN Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use. (com.amazonaws.services.s3.internal.S3AbortableInputStream)
kafka-security-manager_1  | [2020-03-03 19:57:46,948] ERROR unexpected exception (com.github.simplesteph.ksm.KafkaSecurityManager$)
kafka-security-manager_1  | java.util.concurrent.ExecutionException: java.io.IOException: Attempted read on closed stream.
kafka-security-manager_1  |     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
kafka-security-manager_1  |     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
kafka-security-manager_1  |     at com.github.simplesteph.ksm.KafkaSecurityManager$.delayedEndpoint$com$github$simplesteph$ksm$KafkaSecurityManager$1(KafkaSecurityManager.scala:79)
kafka-security-manager_1  |     at com.github.simplesteph.ksm.KafkaSecurityManager$delayedInit$body.apply(KafkaSecurityManager.scala:18)
kafka-security-manager_1  |     at scala.Function0.apply$mcV$sp(Function0.scala:39)
kafka-security-manager_1  |     at scala.Function0.apply$mcV$sp$(Function0.scala:39)
kafka-security-manager_1  |     at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
kafka-security-manager_1  |     at scala.App.$anonfun$main$1$adapted(App.scala:80)
kafka-security-manager_1  |     at scala.collection.immutable.List.foreach(List.scala:392)
kafka-security-manager_1  |     at scala.App.main(App.scala:80)
kafka-security-manager_1  |     at scala.App.main$(App.scala:78)
kafka-security-manager_1  |     at com.github.simplesteph.ksm.KafkaSecurityManager$.main(KafkaSecurityManager.scala:18)
kafka-security-manager_1  |     at com.github.simplesteph.ksm.KafkaSecurityManager.main(KafkaSecurityManager.scala)
kafka-security-manager_1  | Caused by: java.io.IOException: Attempted read on closed stream.
kafka-security-manager_1  |     at org.apache.http.conn.EofSensorInputStream.isReadAllowed(EofSensorInputStream.java:107)
kafka-security-manager_1  |     at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:133)
kafka-security-manager_1  |     at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
kafka-security-manager_1  |     at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
kafka-security-manager_1  |     at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
kafka-security-manager_1  |     at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
kafka-security-manager_1  |     at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
kafka-security-manager_1  |     at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
kafka-security-manager_1  |     at java.security.DigestInputStream.read(DigestInputStream.java:161)
kafka-security-manager_1  |     at com.amazonaws.services.s3.internal.DigestValidationInputStream.read(DigestValidationInputStream.java:59)
kafka-security-manager_1  |     at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
kafka-security-manager_1  |     at com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:125)
kafka-security-manager_1  |     at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82)
kafka-security-manager_1  |     at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
kafka-security-manager_1  |     at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
kafka-security-manager_1  |     at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
kafka-security-manager_1  |     at java.io.InputStreamReader.read(InputStreamReader.java:184)
kafka-security-manager_1  |     at java.io.BufferedReader.read1(BufferedReader.java:210)
kafka-security-manager_1  |     at java.io.BufferedReader.read(BufferedReader.java:286)
kafka-security-manager_1  |     at java.io.BufferedReader.fill(BufferedReader.java:161)
kafka-security-manager_1  |     at java.io.BufferedReader.read(BufferedReader.java:182)
kafka-security-manager_1  |     at com.github.tototoshi.csv.ReaderLineReader.readLineWithTerminator(ReaderLineReader.java:21)
kafka-security-manager_1  |     at com.github.tototoshi.csv.CSVReader.parseNext$1(CSVReader.scala:33)
kafka-security-manager_1  |     at com.github.tototoshi.csv.CSVReader.readNext(CSVReader.scala:51)
kafka-security-manager_1  |     at com.github.tototoshi.csv.CSVReader.allWithOrderedHeaders(CSVReader.scala:101)
kafka-security-manager_1  |     at com.github.tototoshi.csv.CSVReader.allWithHeaders(CSVReader.scala:97)
kafka-security-manager_1  |     at com.github.simplesteph.ksm.parser.CsvAclParser.aclsFromReader(CsvAclParser.scala:79)
kafka-security-manager_1  |     at com.github.simplesteph.ksm.AclSynchronizer.run(AclSynchronizer.scala:98)
kafka-security-manager_1  |     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
kafka-security-manager_1  |     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
kafka-security-manager_1  |     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
kafka-security-manager_1  |     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
kafka-security-manager_1  |     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
kafka-security-manager_1  |     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
kafka-security-manager_1  |     at java.lang.Thread.run(Thread.java:748)

Configuration

      SOURCE_CLASS: "com.github.simplesteph.ksm.source.S3SourceAcl"
      SOURCE_S3_REGION: "us-east-1"
      SOURCE_S3_BUCKETNAME: "bucket-name"
      SOURCE_S3_OBJECTKEY: "acls.csv"
      AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
      AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION}
simplesteph commented 4 years ago

Sorry I didn’t develop this one source. I’m accepting a PR if you can get to the bottom of this. Thanks !

On Tue, Mar 3 2020 at 8:55 PM, wajahat-nclouds < notifications@github.com > wrote:

I am using com.github.simplesteph.ksm.source.S3SourceAcl source class to read CSV file from the bucket but I am getting the following exception

Exception

WARN Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use. (com.amazonaws.services.s3.internal.S3AbortableInputStream) kafka-security-manager_1 | [2020-03-03 19:57:46,948] ERROR unexpected exception (com.github.simplesteph.ksm.KafkaSecurityManager$) kafka-security-manager_1 | java.util.concurrent.ExecutionException: java.io.IOException: Attempted read on closed stream. kafka-security-manager_1 | at java.util.concurrent.FutureTask.report(FutureTask.java:122) kafka-security-manager_1 | at java.util.concurrent.FutureTask.get(FutureTask.java:192) kafka-security-manager_1 | at com.github.simplesteph.ksm.KafkaSecurityManager$.delayedEndpoint$com$github$simplesteph$ksm$KafkaSecurityManager$1(KafkaSecurityManager.scala:79) kafka-security-manager_1 | at com.github.simplesteph.ksm.KafkaSecurityManager$delayedInit$body.apply(KafkaSecurityManager.scala:18) kafka-security-manager_1 | at scala.Function0.apply$mcV$sp(Function0.scala:39) kafka-security-manager_1 | at scala.Function0.apply$mcV$sp$(Function0.scala:39) kafka-security-manager_1 | at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17) kafka-security-manager_1 | at scala.App.$anonfun$main$1$adapted(App.scala:80) kafka-security-manager_1 | at scala.collection.immutable.List.foreach(List.scala:392) kafka-security-manager_1 | at scala.App.main(App.scala:80) kafka-security-manager_1 | at scala.App.main$(App.scala:78) kafka-security-manager_1 | at com.github.simplesteph.ksm.KafkaSecurityManager$.main(KafkaSecurityManager.scala:18) kafka-security-manager_1 | at com.github.simplesteph.ksm.KafkaSecurityManager.main(KafkaSecurityManager.scala) kafka-security-manager_1 | Caused by: java.io.IOException: Attempted read on closed stream. kafka-security-manager_1 | at org.apache.http.conn.EofSensorInputStream.isReadAllowed(EofSensorInputStream.java:107) kafka-security-manager_1 | at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:133) kafka-security-manager_1 | at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) kafka-security-manager_1 | at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) kafka-security-manager_1 | at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) kafka-security-manager_1 | at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) kafka-security-manager_1 | at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) kafka-security-manager_1 | at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180) kafka-security-manager_1 | at java.security.DigestInputStream.read(DigestInputStream.java:161) kafka-security-manager_1 | at com.amazonaws.services.s3.internal.DigestValidationInputStream.read(DigestValidationInputStream.java:59) kafka-security-manager_1 | at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) kafka-security-manager_1 | at com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:125) kafka-security-manager_1 | at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:82) kafka-security-manager_1 | at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) kafka-security-manager_1 | at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) kafka-security-manager_1 | at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) kafka-security-manager_1 | at java.io.InputStreamReader.read(InputStreamReader.java:184) kafka-security-manager_1 | at java.io.BufferedReader.read1(BufferedReader.java:210) kafka-security-manager_1 | at java.io.BufferedReader.read(BufferedReader.java:286) kafka-security-manager_1 | at java.io.BufferedReader.fill(BufferedReader.java:161) kafka-security-manager_1 | at java.io.BufferedReader.read(BufferedReader.java:182) kafka-security-manager_1 | at com.github.tototoshi.csv.ReaderLineReader.readLineWithTerminator(ReaderLineReader.java:21) kafka-security-manager_1 | at com.github.tototoshi.csv.CSVReader.parseNext$1(CSVReader.scala:33) kafka-security-manager_1 | at com.github.tototoshi.csv.CSVReader.readNext(CSVReader.scala:51) kafka-security-manager_1 | at com.github.tototoshi.csv.CSVReader.allWithOrderedHeaders(CSVReader.scala:101) kafka-security-manager_1 | at com.github.tototoshi.csv.CSVReader.allWithHeaders(CSVReader.scala:97) kafka-security-manager_1 | at com.github.simplesteph.ksm.parser.CsvAclParser.aclsFromReader(CsvAclParser.scala:79) kafka-security-manager_1 | at com.github.simplesteph.ksm.AclSynchronizer.run(AclSynchronizer.scala:98) kafka-security-manager_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) kafka-security-manager_1 | at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) kafka-security-manager_1 | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) kafka-security-manager_1 | at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) kafka-security-manager_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) kafka-security-manager_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) kafka-security-manager_1 | at java.lang.Thread.run(Thread.java:748)

Configuration

SOURCE_CLASS: "com.github.simplesteph.ksm.source.S3SourceAcl" SOURCE_S3_REGION: "us-east-1" SOURCE_S3_BUCKETNAME: "bucket-name" SOURCE_S3_OBJECTKEY: "acls.csv" AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID} AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY} AWS_DEFAULT_REGION: ${AWS_DEFAULT_REGION}

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub ( https://github.com/simplesteph/kafka-security-manager/issues/77?email_source=notifications&email_token=AE7CW2KVI2WBLQJ5DQIZD6LRFVVDXA5CNFSM4LAS3L72YY3PNVWWK3TUL52HS4DFUVEXG43VMWVGG33NNVSW45C7NFSM4ISE26NQ ) , or unsubscribe ( https://github.com/notifications/unsubscribe-auth/AE7CW2MQFOX47ZI35L4NDLLRFVVDXANCNFSM4LAS3L7Q ).

wajahat-nclouds commented 4 years ago

@silverbadge Please look into this issue.

kgupta26 commented 4 years ago

The error is happening because the bucket is being closed before the stream (reader) is read. I'll make a PR soon

kgupta26 commented 4 years ago

Opened a PR for this. https://github.com/simplesteph/kafka-security-manager/pull/87