grepplabs / kafka-proxy

Proxy connections to Kafka cluster. Connect through SOCKS Proxy, HTTP Proxy or to cluster running in Kubernetes.
Apache License 2.0
501 stars 87 forks source link

cannot connect to aws kafka from on-prem server with SASL_SSL enabled #139

Open gaganyaan2 opened 1 year ago

gaganyaan2 commented 1 year ago

I'm trying to use Kafka-proxy to connect aws kafka from my on-prem local machine with SASL_SSL auth enabled. image

What's working:

What's NOT working:

When I run a proxy with the below option and,

[root@ip-10-20-109-135 ~]# kafka-proxy server --bootstrap-server-mapping "b-2.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096,10.20.109.135:3001" \
>                      --bootstrap-server-mapping "b-1.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096,10.20.109.135:3002" \
>                      --sasl-enable \
>                      --sasl-username "alice" \
>                      --sasl-password "alice-secret" \
>                      --sasl-method "SCRAM-SHA-512" \
>                      --tls-enable \
>                      --tls-insecure-skip-verify \
>                      --log-level debug
INFO[2023-07-28T06:42:11Z] Starting kafka-proxy version 0.3.6
INFO[2023-07-28T06:42:11Z] Bootstrap server b-2.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096 advertised as 10.20.109.135:3001
INFO[2023-07-28T06:42:11Z] Bootstrap server b-1.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096 advertised as 10.20.109.135:3002
INFO[2023-07-28T06:42:11Z] Listening on 10.20.109.135:3001 (10.20.109.135:3001) for remote b-2.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096
INFO[2023-07-28T06:42:11Z] Listening on 10.20.109.135:3002 (10.20.109.135:3002) for remote b-1.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096
INFO[2023-07-28T06:42:11Z] Ready for new connectionsINFO[2023-07-28T06:42:19Z] New connection for b-2.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096
DEBU[2023-07-28T06:42:19Z] SASLSCRAM: Doing handshake. Mechanism: SCRAM-SHA-512DEBU[2023-07-28T06:42:19Z] Successful SASL handshake. Available mechanisms: [SCRAM-SHA-512]DEBU[2023-07-28T06:42:19Z] Commencing scram loopDEBU[2023-07-28T06:42:19Z] SASL SCRAM authentication succeededDEBU[2023-07-28T06:42:19Z] Kafka request key 9217, version 1, length 369295617INFO[2023-07-28T06:42:19Z] Reading data from local connection on 10.20.109.135:3001 from 100.MASK.MASK.MASK:62486 (b-2.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096) had error: api key 9217 is invalidINFO[2023-07-28T06:42:20Z] New connection for b-2.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096
DEBU[2023-07-28T06:42:20Z] SASLSCRAM: Doing handshake. Mechanism: SCRAM-SHA-512
DEBU[2023-07-28T06:42:20Z] Successful SASL handshake. Available mechanisms: [SCRAM-SHA-512]
DEBU[2023-07-28T06:42:20Z] Commencing scram loop
DEBU[2023-07-28T06:42:20Z] SASL SCRAM authentication succeeded
DEBU[2023-07-28T06:42:20Z] Kafka request key 9217, version 1, length 369295617
INFO[2023-07-28T06:42:20Z] Reading data from local connection on 10.20.109.135:3001 from 100.MASK.MASK.MASK:62488 (b-2.poccluster2.XYZ.c4.kafka.eu-west-1.amazonaws.com:9096
) had error: api key 9217 is invalid

Run Python client to connect to Kafka.

consumer.py

from confluent_kafka import Consumer, KafkaError, KafkaException
import sys
import time
import argparse
parser = argparse.ArgumentParser(description='kafka consumer parameters')
parser.add_argument('--server', type=str,
                    help='kafka server with port eg. kafka:9092.')

parser.add_argument('--consumer_group', type=str,
                    help='consumer_group name.')

parser.add_argument('--topic', type=str,
                    help='kafka topic name.')

parser.add_argument('--messages', type=int,
                    help='number of messages needs to be consumed.')

parser.add_argument('--sleep', type=float,
                    help='sleep between messages consumed.')

args = parser.parse_args()

kafka_server = args.server
kafka_consumer_group = args.consumer_group
kafka_topic = args.topic
kafka_messages= args.messages
kafka_sleep = args.sleep

conf = {'bootstrap.servers': kafka_server,
        'group.id': kafka_consumer_group,
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'SCRAM-SHA-512',
        'sasl.username': 'alice',
        'sasl.password': 'alice-secret',
        'ssl.endpoint.identification.algorithm': " ",
        'enable.ssl.certificate.verification': "false",
        # 'enable.auto.commit': True,
        'auto.offset.reset': 'earliest'}

consumer = Consumer(conf)

def msg_process(msg):
    print(f"key: {msg.key()}, value: {msg.value()}")

running = True

def kafka_consumer(consumer, kafka_topic):
    try:
        print("Consuming from kafka_topic: ",str(kafka_topic))
        consumer.subscribe([kafka_topic])
        count = 0
        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                print("message processing:")
                msg_process(msg)
                time.sleep(kafka_sleep)

                count = count + 1
                print("count :",count)
                if count == kafka_messages:
                    while True:
                        print("infinite sleep...")
                        time.sleep(60)

    finally:
        print("finally block")
        consumer.close()

#call kafka_consumer function
kafka_consumer(consumer,kafka_topic)

This is what I'm running on my on-prem machine.

on-prem@user:/$ python3 consumer.py --server X.X.X.X:3001  --consumer_group group1 --topic test --messages 100 --sleep 1
Consuming from kafka_topic:  test
%3|1690526538.799|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://X.X.X.X::3001/bootstrap]: sasl_ssl://X.X.X.X::3001/bootstrap: SSL handshake failed: Disconnected: connecting to a PLAINTEXT broker listener? (after 174ms in state SSL_HANDSHAKE)
%3|1690526539.776|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://X.X.X.X::3001/bootstrap]: sasl_ssl://X.X.X.X::3001/bootstrap: SSL handshake failed: Disconnected: connecting to a PLAINTEXT broker listener? (after 151ms in state SSL_HANDSHAKE, 1 identical error(s) suppressed)

I'm using Kafka 2.8.1 version and kafka-proxy 0.3.6 version.

Issues seems similar to https://github.com/grepplabs/kafka-proxy/issues/28 I tried adding below option but no luck.

'ssl.endpoint.identification.algorithm': " ",
'enable.ssl.certificate.verification': "false",

Please help.

dnltech2020 commented 1 year ago

are you sure you enabled SCRAM auth on your MSK cluster?

SCRAM an only be enabled on new MSK builds, you can not enable SCRAM after the cluster is already built.

dnltech2020 commented 1 year ago

also noticed in your py script showing your kafkabrocker on port 9092 if your broker is on 9096 for SCRAM you would need to change below in your py script to 9096

help='kafka server with port eg. kafka:9092.')