wbarnha / kafka-python-ng

Fork for Python client for Apache Kafka
https://wbarnha.github.io/kafka-python-ng/
Apache License 2.0
42 stars 4 forks source link

Bootstrap socket double unregister problem #180

Open dingxiong opened 1 month ago

dingxiong commented 1 month ago

Hi team,

The latest release, 2.2.2, has a problem with the bootstrap socket being double unregistered. This only happens occasionally. See the example logs below.

I think the most obvious issue here is that socket node_id=bootstrap-1 is unregistered twice in a row and the second time, it throws. Also, it is related to https://github.com/wbarnha/kafka-python-ng/issues/177. We suspect https://github.com/wbarnha/kafka-python-ng/pull/156 has some problem. The PR description does not provide any test about how it solves the 100% CPU usage. Should we revert that pr?

A simple fix could be adding a try catch block as below

--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -637,7 +637,10 @@ class KafkaClient:

         for key, events in ready:
             if key.fileobj.fileno() < 0:
-                self._selector.unregister(key.fileobj)
+                try:
+                    self._selector.unregister(key.fileobj)
+                except KeyError:
+                    pass

             if key.fileobj is self._wake_r:
                 self._clear_wake_fd()

I am happy to raise a pr if it makes sense to you.

Example error Logs

"Running Kafka consumer for topics evergreen-production-1.admincoin.mutations with group id infra-consumer-group."
"Booting consumer lib.kafka.consumer.binlog_consumer.BinlogConsumer with pid: 1"
"Updating subscribed topics to: ('evergreen-production-1.admincoin.mutations',)"
"Consumer lib.kafka.consumer.binlog_consumer.BinlogConsumer starts running with pid: 1"
"<BrokerConnection node_id=bootstrap-0 host=b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.65.61', 9096)]>: connecting to b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 [('172.31.65.61', 9096) IPv4]"
"<BrokerConnection node_id=bootstrap-0 host=b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.65.61', 9096)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile=None, capath='/usr/lib/ssl/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/usr/lib/ssl/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/usr/lib/ssl/certs')"
"<BrokerConnection node_id=bootstrap-2 host=b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.56.191', 9096)]>: connecting to b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 [('172.31.56.191', 9096) IPv4]"
"<BrokerConnection node_id=bootstrap-2 host=b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.56.191', 9096)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile=None, capath='/usr/lib/ssl/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/usr/lib/ssl/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/usr/lib/ssl/certs')"
"<BrokerConnection node_id=bootstrap-0 host=b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.65.61', 9096)]>: Authenticated as msk-user via SCRAM-SHA-512"
"<BrokerConnection node_id=bootstrap-0 host=b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.65.61', 9096)]>: Connection complete."
"<BrokerConnection node_id=bootstrap-1 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.94.126', 9096)]>: connecting to b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 [('172.31.94.126', 9096) IPv4]"
"<BrokerConnection node_id=bootstrap-2 host=b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.56.191', 9096)]>: Connection complete."
"<BrokerConnection node_id=bootstrap-2 host=b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.56.191', 9096)]>: Authenticated as msk-user via SCRAM-SHA-512"
"<BrokerConnection node_id=1 host=b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.65.61', 9096)]>: connecting to b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 [('172.31.65.61', 9096) IPv4]"
"<BrokerConnection node_id=bootstrap-1 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.94.126', 9096)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile=None, capath='/usr/lib/ssl/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/usr/lib/ssl/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/usr/lib/ssl/certs')"
"<BrokerConnection node_id=1 host=b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.65.61', 9096)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile=None, capath='/usr/lib/ssl/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/usr/lib/ssl/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/usr/lib/ssl/certs')"
"<BrokerConnection node_id=3 host=b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.56.191', 9096)]>: connecting to b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 [('172.31.56.191', 9096) IPv4]"
"<BrokerConnection node_id=3 host=b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.56.191', 9096)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile=None, capath='/usr/lib/ssl/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/usr/lib/ssl/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/usr/lib/ssl/certs')"
"<BrokerConnection node_id=1 host=b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.65.61', 9096)]>: Authenticated as msk-user via SCRAM-SHA-512"
"<BrokerConnection node_id=1 host=b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.65.61', 9096)]>: Connection complete."
"<BrokerConnection node_id=bootstrap-1 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.94.126', 9096)]>: Closing connection. "
"<BrokerConnection node_id=bootstrap-0 host=b-1.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connected> [IPv4 ('172.31.65.61', 9096)]>: Closing connection. "
"<BrokerConnection node_id=bootstrap-2 host=b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connected> [IPv4 ('172.31.56.191', 9096)]>: Closing connection. "
"<BrokerConnection node_id=2 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.94.126', 9096)]>: connecting to b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 [('172.31.94.126', 9096) IPv4]"
"<BrokerConnection node_id=2 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.94.126', 9096)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile=None, capath='/usr/lib/ssl/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/usr/lib/ssl/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/usr/lib/ssl/certs')"
"<BrokerConnection node_id=bootstrap-1 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.94.126', 9096)]>: connecting to b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 [('172.31.94.126', 9096) IPv4]"
"<BrokerConnection node_id=3 host=b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.56.191', 9096)]>: Connection complete."
"<BrokerConnection node_id=3 host=b-3.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.56.191', 9096)]>: Authenticated as msk-user via SCRAM-SHA-512"
"<BrokerConnection node_id=bootstrap-1 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.94.126', 9096)]>: Closing connection. "
"<BrokerConnection node_id=bootstrap-1 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.94.126', 9096)]>: connecting to b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 [('172.31.94.126', 9096) IPv4]"
"<BrokerConnection node_id=bootstrap-1 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <connecting> [IPv4 ('172.31.94.126', 9096)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile=None, capath='/usr/lib/ssl/certs', openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/usr/lib/ssl/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/usr/lib/ssl/certs')"
"<BrokerConnection node_id=2 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.94.126', 9096)]>: Authenticated as msk-user via SCRAM-SHA-512"
"<BrokerConnection node_id=2 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <authenticating> [IPv4 ('172.31.94.126', 9096)]>: Connection complete."
"<BrokerConnection node_id=bootstrap-1 host=b-2.production.l42atw.c2.kafka.us-east-2.amazonaws.com:9096 <handshake> [IPv4 ('172.31.94.126', 9096)]>: Closing connection. "
"Consumer error for group infra-consumer-group-INFRA-: Invalid file descriptor: -1"
Traceback (most recent call last):
  File "/app/lib/kafka/consumer/base_consumer.py", line 197, in run
    topic_to_msgs = self.kafka_consumer.poll(  # pyright: ignore
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kafka/consumer/group.py", line 663, in poll
    records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kafka/consumer/group.py", line 685, in _poll_once
    self._coordinator.poll()
  File "/usr/local/lib/python3.11/site-packages/kafka/coordinator/consumer.py", line 274, in poll
    self.ensure_coordinator_ready()
  File "/usr/local/lib/python3.11/site-packages/kafka/coordinator/base.py", line 267, in ensure_coordinator_ready
    self._client.poll(future=future)
  File "/usr/local/lib/python3.11/site-packages/kafka/client_async.py", line 601, in poll
    self._poll(timeout / 1000)
  File "/usr/local/lib/python3.11/site-packages/kafka/client_async.py", line 640, in _poll
    self._selector.unregister(key.fileobj)
  File "/usr/local/lib/python3.11/selectors.py", line 366, in unregister
    key = super().unregister(fileobj)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/selectors.py", line 249, in unregister
    key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/selectors.py", line 225, in _fileobj_lookup
    return _fileobj_to_fd(fileobj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/selectors.py", line 42, in _fileobj_to_fd
    raise ValueError("Invalid file descriptor: {}".format(fd))
ValueError: Invalid file descriptor: -1
wbarnha commented 1 month ago

Thanks for the bug report! I'll take a look into it when I get the opportunity.

dingxiong commented 3 weeks ago

@wbarnha sorry. Closed this issue by mistake.