dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.59k stars 1.4k forks source link

AWS lambda cannot send message to Kafka running on EC2 instance #2116

Open vnmrbu opened 4 years ago

vnmrbu commented 4 years ago

Dear everybody, I have kafka running on AWS EC2 instance at private IP addr: 'XX.XX.XX.XX'. The instance id is 'ABCDEF', VPC id is 'vpc-YYYYYYY'. In that, i've created topic 'new-topic'. I've tested publish message to kafka by producer console and check message by consumer console is OK (as LOCALHOST:9092). I want publish message from Lambda function to Kafka on this EC2 instance server. In the AWS lambda function(by Python) I write this code:

from kafka import KafkaProducer

def lambda_handler(event, context):
    # create kafka Producer
    kafka_producer=KafkaProducer(bootstrap_servers=['XX.XX.XX.XX:9092'],
                                 api_version=(0, 11))
    #send message to Kafka
    kafka_producer.send('new-topic', b'hello from python lambda')

I'v add this lambda function to VPC id 'vpc-YYYYYYY' the EC2 running. and config Role, Policy... But, when I run test the lambda function, only error message response:

Response:
{
  "errorMessage": "..... Task timed out after 10.01 seconds"
}

I don't know why. Can anyone help me? Thank you so much!

jeffwidman commented 3 years ago

There's nothing we can really go on here, you'd need to provide a lot more info. It's most likely a mismatch between your config and your environment.

nbtk123 commented 3 years ago

It smells like AWS security group thing.

Make sure that the same security group which is attached to the EC2, is also attaches to the Lambda, and then create an Inbound Rule for the kafka ports for that security group.

If the lambda is in a different security group, make sure that the EC2 security group has inbound rule from the Lambda security group

et304383 commented 3 years ago

I'm having the same issue. Both Lambda and MSK are in the same VPC, within the same subnets. The security group rules are allowing traffic. I've verified the port is open within the Python code:

    a_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    location = (broker, port)
    result_of_check = a_socket.connect_ex(location)

    if result_of_check == 0:
        print("Port is open")
    else:
        print("Port is not open")

    a_socket.close()

The output prints the port is open. I'm using port 9092.

I've disabled all auth and set the MSK to use plain text. I get the same behaviour where there's no output. The code simply hangs on the first attempt to send a message and then times out.

The same code works locally connecting to a Kafka cluster running in Docker. All I change is the broker value.

jeffwidman commented 3 years ago

Can you tcpdump and then in wireshark decode into kafka protocol messages?

That will let you see exactly what is sent to/from AWS and should make the debugging a lot easier.

None of the maintainers have a way to easily repro, so if you can figure out the problem we can probably take a look at fixing, but until then not much we can do.

et304383 commented 3 years ago

I was able to get an error after using confluent kafka. Originally my MSK cluster was set to NOT auto create topics, and then I saw the next error which was invalid replication factor (set to 3 when I have 2 instances).

The defaults for configuration in AWS MSK are definitely problematic.

jeffwidman commented 3 years ago

Hmm... sounds like kafka-python should be catching/handling/raising those errors. Again, if there's any way to get a tcpdump of what Kafka sends back over the wire that'd be super helpful.

Ankithashetty-ai commented 2 weeks ago

Even I am facing the same issue. I have created Amazon msk cluster of version 2.8.1 and I have created topic for this cluster on an ec2 instance using Kafka version 2.8.1.i am trying to send message to Kafka topic from s3 triggered lambda function but I am unable to do it because of kafka- python version compatibility issue. How to fix this?