dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.61k stars 1.41k forks source link

Getting ValueError: Invalid file object: None in check_version when connecting #2450

Open dgoldenberg-ias opened 1 day ago

dgoldenberg-ias commented 1 day ago

Environment

kafka-python = "^2.0.2" python = Python 3.10.15

We're using AWS MSK - Kafka version 3.5.1.

Code

import traceback

from kafka import KafkaAdminClient

BOOTSTRAP_SERVERS = (
    "srv1.us-east-1.amazonaws.com:9092"
    ",srv2.us-east-1.amazonaws.com:9092"
    ",srv3.us-east-1.amazonaws.com:9092"
)

def run_util() -> None:
    admin_client = None
    try:
        # Create an admin client
        print(f">> Connecting to: '{BOOTSTRAP_SERVERS}'...")
        # api_version=(3, 5, 1)
        admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)

        # Retrieve and print the list of topics
        topics = admin_client.list_topics()
        print("Kafka Topics:")
        for topic in topics:
            print(f"- {topic}")

    except Exception as e:
        print(f"An error occurred while creating the admin client: {e}")
        traceback.print_exc()
    finally:
        # Close the admin client if it was created
        if admin_client:
            try:
                admin_client.close()
            except Exception as close_e:
                print(f"Error while closing the admin client: {close_e}")
                traceback.print_exc()

if __name__ == "__main__":
    run_util()

Observations

When I run this code locally, I get the below error. Curiously, when running it from a databricks notebook, I'm not getting the error. Python version there is 3.10.12.

Error stack trace

An error occurred while creating the admin client: Invalid file object: None
Traceback (most recent call last):
  File "/msk_proj/with_kafka_python/msk_kp_util_list_topics.py", line 49, in run_util
    admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS)
  File "/.venv310/lib/python3.10/site-packages/kafka/admin/client.py", line 208, in __init__
    self._client = KafkaClient(metrics=self._metrics,
  File "/.venv310/lib/python3.10/site-packages/kafka/client_async.py", line 244, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/.venv310/lib/python3.10/site-packages/kafka/client_async.py", line 909, in check_version
    version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
  File "/.venv310/lib/python3.10/site-packages/kafka/conn.py", line 1254, in check_version
    selector.register(self._sock, selectors.EVENT_READ)
  File "/opt/homebrew/Cellar/python@3.10/3.10.15/Frameworks/Python.framework/Versions/3.10/lib/python3.10/selectors.py", line 518, in register
    key = super().register(fileobj, events, data)
  File "/opt/homebrew/Cellar/python@3.10/3.10.15/Frameworks/Python.framework/Versions/3.10/lib/python3.10/selectors.py", line 239, in register
    key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
  File "/opt/homebrew/Cellar/python@3.10/3.10.15/Frameworks/Python.framework/Versions/3.10/lib/python3.10/selectors.py", line 226, in _fileobj_lookup
    return _fileobj_to_fd(fileobj)
  File "/opt/homebrew/Cellar/python@3.10/3.10.15/Frameworks/Python.framework/Versions/3.10/lib/python3.10/selectors.py", line 39, in _fileobj_to_fd
    raise ValueError("Invalid file object: "
ValueError: Invalid file object: None

Specifying the Kafka API version explicitly

Specifying the Kafka API version explicitly:

admin_client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS, api_version=(3, 5, 1))

Same error.

Network?

Seems OK.

telnet srv1.us-east-1.amazonaws.com 9092
Trying 10.34.11.23...
Connected to srv1.amazonaws.com.
Escape character is '^]'.

Basic socket operations

Used the below code to verify sockets are working fine.

import socket

try:
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(('srv1.us-east-1.amazonaws.com', 9092))
    print("Connection successful!")
except Exception as e:
    print(f"Socket connection failed: {e}")
finally:
    sock.close()

The exception occurs here: https://github.com/dpkp/kafka-python/blob/5bb126bf20bbb5baeb4e9afc48008dbe411631bc/kafka/conn.py#L1254 in the check_version method.

        for version, request in test_cases:
            if not self.connect_blocking(timeout_at - time.time()):
                reset_override_configs()
                raise Errors.NodeNotReadyError()
            f = self.send(request)
            # HACK: sleeping to wait for socket to send bytes
            time.sleep(0.1)
            # when broker receives an unrecognized request API
            # it abruptly closes our socket.
            # so we attempt to send a second request immediately
            # that we believe it will definitely recognize (metadata)
            # the attempt to write to a disconnected socket should
            # immediately fail and allow us to infer that the prior
            # request was unrecognized
            mr = self.send(MetadataRequest[0](topics))

            selector = self.config['selector']()
            selector.register(self._sock, selectors.EVENT_READ)

Any cause or workaround? Thanks.

dgoldenberg-ias commented 1 day ago

Enabled logging enabled and increased ulimit to 1024 locally.

dgoldenberg-ias commented 1 day ago

What I'm seeing in the log:

INFO - <BrokerConnection node_id=bootstrap-0 host=srv3.us-east-1.amazonaws.com:9092 <connecting> [IPv4 ('XX.XX.X.XXX', 9092)]>: connecting to srv3.us-east-1.amazonaws.com:9092 [('XX.XX.X.XXX', 9092) IPv4]

INFO - Probing node bootstrap-0 broker version
INFO - <BrokerConnection node_id=bootstrap-0 host=srv3.us-east-1.amazonaws.com:9092 <connecting> [IPv4 ('XX.XX.X.XXX', 9092)]>: Connection complete.

ERROR - Error sending request data to <BrokerConnection node_id=bootstrap-0 host=srv3.us-east-1.amazonaws.com:9092 <connected> [IPv4 ('XX.XX.X.XXX', 9092)]>
Traceback (most recent call last):
  File "/msk-proj/.venv310/lib/python3.10/site-packages/kafka/conn.py", line 998, in send_pending_requests
    total_bytes = self._send_bytes_blocking(data)
  File "/msk-proj/.venv310/lib/python3.10/site-packages/kafka/conn.py", line 601, in _send_bytes_blocking
    sent_bytes = self._sock.send(data[total_sent:])
BrokenPipeError: [Errno 32] Broken pipe

It seems to me the issue comes down to this code:

(https://github.com/dpkp/kafka-python/blob/5bb126bf20bbb5baeb4e9afc48008dbe411631bc/kafka/conn.py#L1242)

            if not self.connect_blocking(timeout_at - time.time()):
                reset_override_configs()
                raise Errors.NodeNotReadyError()
            f = self.send(request)
            # HACK: sleeping to wait for socket to send bytes
            time.sleep(0.1)
            # when broker receives an unrecognized request API
            # it abruptly closes our socket.
            # so we attempt to send a second request immediately
            # that we believe it will definitely recognize (metadata)
            # the attempt to write to a disconnected socket should
            # immediately fail and allow us to infer that the prior
            # request was unrecognized
            mr = self.send(MetadataRequest[0](topics))

Even increasing this sleep to 1 or even 10 is not helping, in my local setup. Any ideas as to how to fix this or work around it?