apache / camel-kafka-connector

Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors
https://camel.apache.org
Apache License 2.0
154 stars 102 forks source link

From S3 to kafka, when using Idempotent filter, Do we still fetch S3 file content ? #314

Closed akhileshacc closed 4 years ago

akhileshacc commented 4 years ago

Hi,

I am using camel's aws and kafka components.

from("aws-s3://checkoutk1?autocloseBody=true&deleteAfterRead=false&args=....")
                .idempotentConsumer(header("CamelAwsS3ETag"),
                        FileIdempotentRepository.fileIdempotentRepository(new File("target/checkoutk1.data"), 250, 512000))                
                .to("kafka:test1?brokers=localhost:9092")

I have added 1 text file in S3. It works well and adds file content to kafka only once. But I see these logs, seems like S3 object is constantly fetched, and only filtered before adding to kafka.

Am i correct in understanding this behaviour ?

If yes, Is there a work around so the file is not fetched.

Logs

If you check text file CamelAwsS3ContentLength, its always CamelAwsS3ContentLength=88.

020-07-03 14:41:13,793 [s3.Basic.main()] INFO  AppInfoParser                  - Kafka version: 2.5.0
2020-07-03 14:41:13,793 [s3.Basic.main()] INFO  AppInfoParser                  - Kafka commitId: 66563e712b0b9f84
2020-07-03 14:41:13,793 [s3.Basic.main()] INFO  AppInfoParser                  - Kafka startTimeMs: 1593767473790
2020-07-03 14:41:13,806 [s3.Basic.main()] INFO  InternalRouteStartupManager    - Route: route1 started and consuming from: aws-s3://checkoutk1
2020-07-03 14:41:13,806 [s3.Basic.main()] INFO  AbstractCamelContext           - Total 1 routes, of which 1 are started
2020-07-03 14:41:13,807 [s3.Basic.main()] INFO  AbstractCamelContext           - Apache Camel 3.5.0-SNAPSHOT (camel-1) started in 1.174 seconds
2020-07-03 14:41:14,037 [ad | producer-1] INFO  Metadata                       - [Producer clientId=producer-1] Cluster ID: 2NaFZgsTQg-4L3vQzwmlkQ
2020-07-03 14:41:15,004 [s3://checkoutk1] INFO  consuming                      - Consumer Fired!
2020-07-03 14:41:15,005 [s3://checkoutk1] INFO  route1                         - Replay Message Sent to file:s3out data/
2020-07-03 14:41:15,012 [s3://checkoutk1] INFO  route1                         - {CamelAwsS3BucketName=checkoutk1, CamelAwsS3ContentControl=null, CamelAwsS3ContentDisposition=null, CamelAwsS3ContentEncoding=null, CamelAwsS3ContentLength=0, CamelAwsS3ContentMD5=null, CamelAwsS3ContentType=application/x-directory, CamelAwsS3ETag=d41d8cd98f00b204e9800998ecf8427e, CamelAwsS3ExpirationTime=null, CamelAwsS3Headers={Accept-Ranges=bytes, Content-Length=0, Content-Type=application/x-directory, ETag=d41d8cd98f00b204e9800998ecf8427e, Last-Modified=Thu Jul 02 16:30:15 IST 2020}, CamelAwsS3Key=data/, CamelAwsS3LastModified=Thu Jul 02 16:30:15 IST 2020, CamelAwsS3ReplicationStatus=null, CamelAwsS3ServerSideEncryption=null, CamelAwsS3StorageClass=null, CamelAwsS3UserMetadata={}, CamelAwsS3VersionId=null}
2020-07-03 14:41:15,015 [s3://checkoutk1] INFO  route1                         - This is new message
2020-07-03 14:41:15,050 [s3://checkoutk1] INFO  consuming                      - Consumer Fired!
2020-07-03 14:41:15,050 [s3://checkoutk1] INFO  route1                         - Replay Message Sent to file:s3out data/text2.txt
2020-07-03 14:41:15,051 [s3://checkoutk1] INFO  route1                         - {CamelAwsS3BucketName=checkoutk1, CamelAwsS3ContentControl=null, CamelAwsS3ContentDisposition=null, CamelAwsS3ContentEncoding=null, CamelAwsS3ContentLength=88, CamelAwsS3ContentMD5=null, CamelAwsS3ContentType=text/plain, CamelAwsS3ETag=f638e3b03d75d5d1d11984585c23bb8d, CamelAwsS3ExpirationTime=null, CamelAwsS3Headers={Accept-Ranges=bytes, Content-Length=88, Content-Type=text/plain, ETag=f638e3b03d75d5d1d11984585c23bb8d, Last-Modified=Fri Jul 03 14:28:25 IST 2020}, CamelAwsS3Key=data/text2.txt, CamelAwsS3LastModified=Fri Jul 03 14:28:25 IST 2020, CamelAwsS3ReplicationStatus=null, CamelAwsS3ServerSideEncryption=null, CamelAwsS3StorageClass=null, CamelAwsS3UserMetadata={}, CamelAwsS3VersionId=null}
2020-07-03 14:41:15,052 [s3://checkoutk1] INFO  route1                         - This is new message
2020-07-03 14:41:15,068 [Producer[test1]] INFO  route1                         - {CamelAwsS3BucketName=checkoutk1, CamelAwsS3ContentControl=null, CamelAwsS3ContentDisposition=null, CamelAwsS3ContentEncoding=null, CamelAwsS3ContentLength=0, CamelAwsS3ContentMD5=null, CamelAwsS3ContentType=application/x-directory, CamelAwsS3ETag=d41d8cd98f00b204e9800998ecf8427e, CamelAwsS3ExpirationTime=null, CamelAwsS3Headers={Accept-Ranges=bytes, Content-Length=0, Content-Type=application/x-directory, ETag=d41d8cd98f00b204e9800998ecf8427e, Last-Modified=Thu Jul 02 16:30:15 IST 2020}, CamelAwsS3Key=data/, CamelAwsS3LastModified=Thu Jul 02 16:30:15 IST 2020, CamelAwsS3ReplicationStatus=null, CamelAwsS3ServerSideEncryption=null, CamelAwsS3StorageClass=null, CamelAwsS3UserMetadata={}, CamelAwsS3VersionId=null, org.apache.kafka.clients.producer.RecordMetadata=[test1-0@93]}
2020-07-03 14:41:15,069 [Producer[test1]] INFO  route1                         - {CamelAwsS3BucketName=checkoutk1, CamelAwsS3ContentControl=null, CamelAwsS3ContentDisposition=null, CamelAwsS3ContentEncoding=null, CamelAwsS3ContentLength=88, CamelAwsS3ContentMD5=null, CamelAwsS3ContentType=text/plain, CamelAwsS3ETag=f638e3b03d75d5d1d11984585c23bb8d, CamelAwsS3ExpirationTime=null, CamelAwsS3Headers={Accept-Ranges=bytes, Content-Length=88, Content-Type=text/plain, ETag=f638e3b03d75d5d1d11984585c23bb8d, Last-Modified=Fri Jul 03 14:28:25 IST 2020}, CamelAwsS3Key=data/text2.txt, CamelAwsS3LastModified=Fri Jul 03 14:28:25 IST 2020, CamelAwsS3ReplicationStatus=null, CamelAwsS3ServerSideEncryption=null, CamelAwsS3StorageClass=null, CamelAwsS3UserMetadata={}, CamelAwsS3VersionId=null, org.apache.kafka.clients.producer.RecordMetadata=[test1-0@94]}
2020-07-03 14:41:20,208 [s3://checkoutk1] INFO  consuming                      - Consumer Fired!
2020-07-03 14:41:20,209 [s3://checkoutk1] INFO  route1                         - Replay Message Sent to file:s3out data/
2020-07-03 14:41:20,209 [s3://checkoutk1] INFO  route1 - {CamelAwsS3BucketName=checkoutk1, CamelAwsS3ContentControl=null, CamelAwsS3ContentDisposition=null, CamelAwsS3ContentEncoding=null, CamelAwsS3ContentLength=0, CamelAwsS3ContentMD5=null, CamelAwsS3ContentType=application/x-directory, CamelAwsS3ETag=d41d8cd98f00b204e9800998ecf8427e, CamelAwsS3ExpirationTime=null, CamelAwsS3Headers={Accept-Ranges=bytes, Content-Length=0, Content-Type=application/x-directory, ETag=d41d8cd98f00b204e9800998ecf8427e, Last-Modified=Thu Jul 02 16:30:15 IST 2020}, CamelAwsS3Key=data/, CamelAwsS3LastModified=Thu Jul 02 16:30:15 IST 2020, CamelAwsS3ReplicationStatus=null, CamelAwsS3ServerSideEncryption=null, CamelAwsS3StorageClass=null, CamelAwsS3UserMetadata={}, CamelAwsS3VersionId=null}
2020-07-03 14:41:20,210 [s3://checkoutk1] INFO  consuming                      - Consumer Fired!
2020-07-03 14:41:20,210 [s3://checkoutk1] INFO  route1                         - Replay Message Sent to file:s3out data/text2.txt
2020-07-03 14:41:20,210 [s3://checkoutk1] INFO  route1                         - {CamelAwsS3BucketName=checkoutk1, CamelAwsS3ContentControl=null, CamelAwsS3ContentDisposition=null, CamelAwsS3ContentEncoding=null, CamelAwsS3ContentLength=88, CamelAwsS3ContentMD5=null, CamelAwsS3ContentType=text/plain, CamelAwsS3ETag=f638e3b03d75d5d1d11984585c23bb8d, CamelAwsS3ExpirationTime=null, CamelAwsS3Headers={Accept-Ranges=bytes, Content-Length=88, Content-Type=text/plain, ETag=f638e3b03d75d5d1d11984585c23bb8d, Last-Modified=Fri Jul 03 14:28:25 IST 2020}, CamelAwsS3Key=data/text2.txt, CamelAwsS3LastModified=Fri Jul 03 14:28:25 IST 2020, CamelAwsS3ReplicationStatus=null, CamelAwsS3ServerSideEncryption=null, CamelAwsS3StorageClass=null, CamelAwsS3UserMetadata={}, CamelAwsS3VersionId=null}
Jul 03, 2020 2:41:20 PM com.amazonaws.services.s3.internal.S3AbortableInputStream close
WARNING: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
2020-07-03 14:41:25,519 [s3://checkoutk1] INFO  consuming                      - Consumer Fired!
2020-07-03 14:41:25,519 [s3://checkoutk1] INFO  route1                         - Replay Message Sent to file:s3out data/
2020-07-03 14:41:25,520 [s3://checkoutk1] INFO  route1                         - {CamelAwsS3BucketName=checkoutk1, CamelAwsS3ContentControl=null, CamelAwsS3ContentDisposition=null, CamelAwsS3ContentEncoding=null, CamelAwsS3ContentLength=0, CamelAwsS3ContentMD5=null, CamelAwsS3ContentType=application/x-directory, CamelAwsS3ETag=d41d8cd98f00b204e9800998ecf8427e, CamelAwsS3ExpirationTime=null, CamelAwsS3Headers={Accept-Ranges=bytes, Content-Length=0, Content-Type=application/x-directory, ETag=d41d8cd98f00b204e9800998ecf8427e, Last-Modified=Thu Jul 02 16:30:15 IST 2020}, CamelAwsS3Key=data/, CamelAwsS3LastModified=Thu Jul 02 16:30:15 IST 2020, CamelAwsS3ReplicationStatus=null, CamelAwsS3ServerSideEncryption=null, CamelAwsS3StorageClass=null, CamelAwsS3UserMetadata={}, CamelAwsS3VersionId=null}
2020-07-03 14:41:25,520 [s3://checkoutk1] INFO  consuming                      - Consumer Fired!
2020-07-03 14:41:25,520 [s3://checkoutk1] INFO  route1                         - Replay Message Sent to file:s3out data/text2.txt
2020-07-03 14:41:25,521 [s3://checkoutk1] INFO  route1                         - {CamelAwsS3BucketName=checkoutk1, CamelAwsS3ContentControl=null, CamelAwsS3ContentDisposition=null, CamelAwsS3ContentEncoding=null, CamelAwsS3ContentLength=88, CamelAwsS3ContentMD5=null, CamelAwsS3ContentType=text/plain, CamelAwsS3ETag=f638e3b03d75d5d1d11984585c23bb8d, CamelAwsS3ExpirationTime=null, CamelAwsS3Headers={Accept-Ranges=bytes, Content-Length=88, Content-Type=text/plain, ETag=f638e3b03d75d5d1d11984585c23bb8d, Last-Modified=Fri Jul 03 14:28:25 IST 2020}, CamelAwsS3Key=data/text2.txt, CamelAwsS3LastModified=Fri Jul 03 14:28:25 IST 2020, CamelAwsS3ReplicationStatus=null, CamelAwsS3ServerSideEncryption=null, CamelAwsS3StorageClass=null, CamelAwsS3UserMetadata={}, CamelAwsS3VersionId=null}
Jul 03, 2020 2:41:25 PM com.amazonaws.services.s3.internal.S3AbortableInputStream close
WARNING: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
2020-07-03 14:41:30,770 [s3://checkoutk1] INFO  consuming                      - Consumer Fired!
2020-07-03 14:41:30,771 [s3://checkoutk1] INFO  route1                         - Replay Message Sent to file:s3out data/
2020-07-03 14:41:30,771 [s3://checkoutk1] INFO  route1                         - {CamelAwsS3BucketName=checkoutk1, CamelAwsS3ContentControl=null, CamelAwsS3ContentDisposition=null, CamelAwsS3ContentEncoding=null, CamelAwsS3ContentLength=0, CamelAwsS3ContentMD5=null, CamelAwsS3ContentType=application/x-directory, CamelAwsS3ETag=d41d8cd98f00b204e9800998ecf8427e, CamelAwsS3ExpirationTime=null, CamelAwsS3Headers={Accept-Ranges=bytes, Content-Length=0, Content-Type=application/x-directory, ETag=d41d8cd98f00b204e9800998ecf8427e, Last-Modified=Thu Jul 02 16:30:15 IST 2020}, CamelAwsS3Key=data/, CamelAwsS3LastModified=Thu Jul 02 16:30:15 IST 2020, CamelAwsS3ReplicationStatus=null, CamelAwsS3ServerSideEncryption=null, CamelAwsS3StorageClass=null, CamelAwsS3UserMetadata={}, CamelAwsS3VersionId=null}   

...
...
It keeps on going like above

I didnt sent it to mailing list, since i got no reply last time.

oscerd commented 4 years ago

Again, this is not the right place for asking this. Ask on the users mailing list of Camel. This is the camel-kafka-connector subproject and it uses a different approach. Wait for response on the ML, users or devs may be busy like you.

akhileshacc commented 4 years ago

Sure thanks.

oscerd commented 4 years ago

Anyway, it's normal. If you don't want the file to be polled each time you can deleteAfterRead. That's the normal behavior, the file is fetched and then checked.