Parsely / pykafka

Apache Kafka client for Python; high-level & low-level consumer/producer, with great performance.
http://pykafka.readthedocs.org/
Apache License 2.0
1.12k stars 232 forks source link

kafka-test pykafka.exceptions.SocketDisconnectedError #1034

Closed mailtoshrb closed 3 years ago

mailtoshrb commented 3 years ago

I am trying to connect to kafka 2.3.1 servers from a test pod with container python:3

My code {code} from pykafka import KafkaClient, SslConfig import pytest import datetime import time import unittest import os from pyunitreport import HTMLTestRunner

class TestKafka(unittest.TestCase):

def test_kafka(self):
    config = SslConfig(cafile='/mnt/secrets/client-server.pki.ca',
                       certfile='/mnt/secrets/client-server.pki.crt',
                       keyfile='/mnt/secrets/client-server.pki.key')
    kafka_version = os.environ.get('KAFKA_VERSION', '2.3.1')
    endpoint = "kafka-ec2.service.local:6668"
    client = KafkaClient(hosts=endpoint, ssl_config=config,
                         broker_version=kafka_version)
    # print('List of topics are: ' + client.topics)
    topic = None
    with self.subTest(msg='Test kafka topics', endpoint=endpoint):
        assert set(['test']).issubset(
            set(list(t.decode('UTF-8') for t in client.topics.keys())))
    with self.subTest(msg='Create test kafka topic', endpoint=endpoint):
        topic = client.topics['test-demo-test']
    with self.subTest(msg='Produce Test message to kafka Topic', endpoint=endpoint):
        with topic.get_sync_producer(delivery_reports=False) as producer:
            producer.produce(bytes('The message sent on {}'.format(
                datetime.datetime.now()), 'UTF-8'))
    with self.subTest(msg='Consume Test message to kafka Topic', endpoint=endpoint):
        consumer = topic.get_simple_consumer(consumer_timeout_ms=5000)
        print(list((message.value).decode('UTF-8')
                   for message in consumer))

def test_two_kafka(self):
    kafka_version = os.environ.get('KAFKA_VERSION', '2.3.1')
    config = SslConfig(cafile='/mnt/secrets/client-server.pki.ca')
    client = KafkaClient(
        hosts="kafka.ec2.service.local:6668", ssl_config=config, broker_version=kafka_version)
    # client.topics
    topic = client.topics["testtopicdemo"]
# Write hello world to test topic
    print('Produce message to topic')
    with topic.get_sync_producer() as producer:
        producer.produce(b"Hello World demo test")
# Print all messages from test topic
    print('Consume message from topic')
    consumer = topic.get_simple_consumer(consumer_timeout_ms=1000)
    time.sleep(5)
    for message in consumer:
        # if message is not None:
        print(message.offset, " ", message.value)

if name == 'main': unittest.main( testRunner=HTMLTestRunner(output='./testreports'))

{code}

error i am getting is {code} kafka-test executing pytest for kafka │ │ kafka-test Running tests... │ │ kafka-test ---------------------------------------------------------------------- │ │ kafka-test test_kafka (main.TestKafka) ... ERROR (2.810243)s │ │ kafka-test test_two_kafka (main.TestKafka) ... ERROR (2.795517)s │ │ kafka-test ====================================================================== │ │ kafka-test ERROR [2.810243s]: b'test_kafka (main.TestKafka)' │ │ kafka-test ---------------------------------------------------------------------- │ │ kafka-test Traceback (most recent call last): │ │ kafka-test File "/code/test_kafka.py", line 18, in test_kafka │ │ kafka-test client = KafkaClient(hosts=endpoint, ssl_config=config, │ │ kafka-test File "/usr/local/lib/python3.9/site-packages/pykafka/client.py", line 133, in init │ │ kafka-test self.cluster = Cluster( │ │ kafka-test File "/usr/local/lib/python3.9/site-packages/pykafka/cluster.py", line 214, in init │ │ kafka-test self.fetch_api_versions() │ │ kafka-test File "/usr/local/lib/python3.9/site-packages/pykafka/cluster.py", line 500, in fetch_api_versions │ │ kafka-test raise SocketDisconnectedError() │ │ kafka-test pykafka.exceptions.SocketDisconnectedError │ │ kafka-test ====================================================================== │ │ kafka-test ERROR [2.795517s]: b'test_two_kafka (main.TestKafka)' │ │ kafka-test ---------------------------------------------------------------------- │ │ kafka-test Traceback (most recent call last): │ │ kafka-test File "/code/test_kafka.py", line 40, in test_two_kafka │ │ kafka-test client = KafkaClient( │ │ kafka-test File "/usr/local/lib/python3.9/site-packages/pykafka/client.py", line 133, in init │ │ kafka-test self.cluster = Cluster( │ │ kafka-test File "/usr/local/lib/python3.9/site-packages/pykafka/cluster.py", line 214, in init │ │ kafka-test self.fetch_api_versions() │ │ kafka-test File "/usr/local/lib/python3.9/site-packages/pykafka/cluster.py", line 500, in fetch_api_versions │ │ kafka-test raise SocketDisconnectedError() │ │ kafka-test pykafka.exceptions.SocketDisconnectedError │ │ kafka-test ---------------------------------------------------------------------- │ │ kafka-test Ran 2 tests in 5.606s │ │ kafka-test FAILED │ │ kafka-test (Errors=2) │ │ kafka-test Generating HTML reports... │ │ kafka-test Reports generated: /code/reports/./testreports/2021-03-03-14-32-12.html {code}

PyKafka version: bash-3.2$ pip3 show pykafka Name: pykafka Version: 2.8.0 Summary: Full-Featured Pure-Python Kafka Client Home-page: https://github.com/Parsely/pykafka Author: Keith Bourgoin and Emmett Butler Author-email: pykafka-user@googlegroups.com License: Apache License 2.0 Location: /Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages Requires: six, kazoo, tabulate

Kafka version:2.3.1

python: python 3.9

I am not getting what exactly wrong here with my ssl here? and How does ssl supposed to work with this?


│ kafka-test File "/usr/local/lib/python3.9/site-packages/pykafka/cluster.py", line 500, in fetch_api_versions │ │ kafka-test raise SocketDisconnectedError() │ │ kafka-test pykafka.exceptions.SocketDisconnectedError │


mailtoshrb commented 3 years ago

Ignore this SSL cert was wrong