mmolimar / kafka-connect-fs

Kafka Connect FileSystem Connector
Apache License 2.0
111 stars 78 forks source link

SQS notifications not triggering #85

Open monsterdeeravi opened 3 years ago

monsterdeeravi commented 3 years ago

A file is uploaded to a folder in the S3 bucket s3://bucket-name/folder1/folder2/

The SQS queue gets a notification, but, the connector doesn't do anything.

connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
policy.s3_event_notifications.poll=30000
errors.log.include.messages= true
policy.regexp=.*
policy.class=com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy
policy.sleepy.sleep=60000
policy.s3_event_notifications.event_regex=.*
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.XmlFileReader
policy.s3_event_notifications.queue=https://sqs.us-east-1.amazonaws.com/xxx/SQS-queue-name
fs.uris=s3a://bucket-name/
policy.s3_event_notifications.delete_messages=true
name=test-fs-s3
topic=test-fs-s3
policy.s3_event_notifications.max_messages=10
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter

The Kafka connect is in an EC2 instance, and its role has been given permission to the SQS queue.

The setup works, as the SQS Source connector (https://www.confluent.io/hub/confluentinc/kafka-connect-sqs) works, and brings the notifications into a topic in Kafka.

Could you please let me know if there is anything off with the settings as defined above.

Thank you.

mmolimar commented 3 years ago

Hi @monsterdeeravi

You're setting the policy.class=com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy and to process files via event notifications you have to use: policy.class=com.github.mmolimar.kafka.connect.fs.policy. S3EventNotificationsPolicy.

Let me know if that worked

monsterdeeravi commented 3 years ago

Thanks @mmolimar Your suggestion allowed it go past the previous state. It now however comes with the error org.apache.kafka.connect.errors.ConnectException: A problem has occurred reading configuration: com.amazonaws.services.sqs.model.QueueDoesNotExistException: The specified queue does not exist for this wsdl version. (Service: AmazonSQS; Status Code: 400; Error Code: AWS.SimpleQueueService.NonExistentQueue;

The updated connector config now is:

connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
policy.s3_event_notifications.poll=30000
errors.log.include.messages= true
policy.class=com.github.mmolimar.kafka.connect.fs.policy.S3EventNotificationsPolicy
policy.s3_event_notifications.event_regex=.*
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.XmlFileReader
policy.s3_event_notifications.queue=https://sqs.us-east-1.amazonaws.com/000/SQS-queue-name
fs.uris=s3a://bucket-name/folder1/folder2/
policy.s3_event_notifications.delete_messages=false
name=test-fs-s3-1
topic=test-fs-s3-1
policy.s3_event_notifications.max_messages=10
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter

I rechecked to see if the queue url is correct, and it matches what is in the SQS console as wel as what has been applied to the SQS Source Connector.

Is there some other setting that needs to be included?

Thank you.

mmolimar commented 3 years ago

I think policy.s3_event_notifications.queue should be SQS-queue-name

monsterdeeravi commented 3 years ago

Thanks. That change let the connector run. Unfortunately, it doesn't respond to queue events.

Does the fs.uris property do anything in this case? It does complain if I change it to s3 instead of s3a, but, looking at the code, it seems to take everything from the message, and it seems to auto add the prefix s3a://

Would it be that AWS S3 has issues connecting to it via s3a instead of s3?

mmolimar commented 3 years ago

You should use s3a. More info here.