awslabs / kinesis-kafka-connector

kinesis-kafka-connector is connector based on Kafka Connect to publish messages to Amazon Kinesis streams or Amazon Kinesis Firehose.
Apache License 2.0
153 stars 91 forks source link

Compatibility with MSK Connect #64

Closed RuiPinto93 closed 2 years ago

RuiPinto93 commented 2 years ago

Hi,

While creating a kinesis-kafka connector using MSK connect, it always fails when trying to reach AWS Kinesis Firehose.

When using the same .JAR and same configuration files but running the connect from an EC2 instance (inside the same VPC as the MSK cluster), this action is successful.

Is this current connector compatible with MSK Connect?

bdesert commented 2 years ago

This connector is compatible with MSK Connect, let's make sure the configuration and the env have been setup properly. Have you created Kinesis endpoint in your VPC? If you did not, please create one, and you also can configure the endpoint in properties. If you already have, or adding endpoint hasn't helped, could you please share the exception you are getting in the logs?

RuiPinto93 commented 2 years ago

Yes I have created an endpoint for the Kinesis Firehose service.

What I find strange is that while running through the EC2 instance (inside the VPC) it can communicate with Firehose without any issue. (it resolves the DNS to 172.31.X.X).

However, when the connector is created through MSK Connect, it seems to resolve the DNS to a different type of IP range and therefore failing as shown in the message below:

com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to firehose.eu-west-1.amazonaws.com:443 [firehose.eu-west-1.amazonaws.com/52.95.121.56] failed: connect timed out

bdesert commented 2 years ago

Can you please check if you are having the recent updates? Please see #60 , is that related? If you are running most recent code and still facing the same issue, please provide your configuration, I'll try to reproduce on my end. DO NOT INCLUDE AWS CREDENTIALS in your config.

bdesert commented 2 years ago

Can you also confirm you have private endpoint for Firehose in your VPC covering all private subnets where your MSK (if you use MSK) or explicitly provided subnets (if you use non-MSK Kafka)? Also, try to explicitly specify region in the properties.

this is my config that working for me:

connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
batchSizeInBytes=3670016
tasks.max=1
topics=sampletopic2
kinesisEndpoint=firehose.us-west-2.amazonaws.com
batch=true
deliveryStream=kafka-2-kdf
value.converter=org.apache.kafka.connect.storage.StringConverter
region=us-west-2
batchSize=10
key.converter=org.apache.kafka.connect.storage.StringConverter
RuiPinto93 commented 2 years ago

Yes, I have the latest code version

Yes, the Private Endpoint that I have created for Firehose has both of the subnets where my MSK cluster is located.

name="Connector name"
connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
tasks.max=1
topics= topicname
region=eu-west-1
batch=true
batchSize=500
batchSizeInBytes=3670016
deliveryStream= deliveryname
bdesert commented 2 years ago

can you please define explicitly the endpoint, like in my example above:

kinesisEndpoint=firehose.eu-west-1.amazonaws.com

Update: changed to eu-west-1 as per the message with exception above.

bdesert commented 2 years ago

@RuiPinto93 checking on the issue. can you please confirm whether or not the suggestion above worked for you?

hafizmujadidKhalid commented 2 years ago

@bdesert I am also facing this issue. Firehose endpoint is created though for all the subnets.

bdesert commented 2 years ago

@hafizmujadidKhalid, could you please post your config? (remove sensitive info). I suspect you need to add endpoint information to a connector configuration. Please also see updates on #65 with example of working configuration (and few comment above from me with my working config example).

hafizmujadidKhalid commented 2 years ago

@bdesert! here are my configs: connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector tasks.max=2 topics=kafka-demo kinesisEndpoint=firehose.eu-central-1.amazonaws.com batch=true deliveryStream=kafka-s3-demo value.converter=org.apache.kafka.connect.storage.StringConverter region=eu-central-1 batchSize=5 key.converter=org.apache.kafka.connect.storage.StringConverter

bdesert commented 2 years ago

@hafizmujadidKhalid your config looks good. Can you please ensure config value (all lines) you copy in UI doesn't contain DOS/Win end of line \r ? Replace \r with \n. Let me know if that helped

hafizmujadidKhalid commented 2 years ago

@bdesert ! thanks, it seems working.

RuiPinto93 commented 2 years ago

This is no longer an issue.

Thanks for the support!

bdesert commented 2 years ago

Closing this issue. Summary of resolution:

  1. if deployed using MSK Connect, a Kinesis endpoint needs to be created in the same VPC and accessible from all the subnets MSK/Connect is deployed on.
  2. this endpoint should be defined in connector properties
  3. connector properties MUST NOT contain \r as new line delimiter. This happens when properties are copied from text editor on windows machine and EOL isn't converted to Unix format.

If you face more problem deploying in MSK Connect, please feel free to re-open this issue.