dpkp / kafka-python

Python client for Apache Kafka
http://kafka-python.readthedocs.io/
Apache License 2.0
5.62k stars 1.41k forks source link

Client spams with warnings "Unable to send to wakeup socket" endlessly #2073

Open Prometheus3375 opened 4 years ago

Prometheus3375 commented 4 years ago

kafka-python version: 2.0.1 python version: 3.7.7

This issue is a duplicate of #1842, that issue is closed and I cannot reopen it. Thus I opened new one.

When I close a spider, kafka starts to spam with "Unable to send to wakeup socket" warning and does not want to stop. Watch attached spider.log file.

I went to sources and add one line which "fix" this issue. Here is original code.

# Source: kafka/client_async.py
# Class: KafkaClient

    def wakeup(self):
        with self._wake_lock:
            try:
                self._wake_w.sendall(b'x')
            except socket.timeout:
                log.warning('Timeout to send to wakeup socket!')
                raise Errors.KafkaTimeoutError()
            except socket.error:
                log.warning('Unable to send to wakeup socket!')

This is fixed.

    def wakeup(self):
        with self._wake_lock:
            try:
                self._wake_w.sendall(b'x')
            except socket.timeout:
                log.warning('Timeout to send to wakeup socket!')
                raise Errors.KafkaTimeoutError()
            except socket.error as e:
                log.warning('Unable to send to wakeup socket!')
                raise e

I do not know what causes the problem and why raising exception stops spam.

joaomcarlos commented 4 years ago

I am seeing the same behavior with nameko_kafka which uses this lib internally.

Miggets7 commented 4 years ago

Having the same issue with the same lib version and python version in a docker container using python:3.7-slim-stretch

peilingkuang commented 4 years ago

i have the same behavior in 2.0.1

tuofang commented 4 years ago

Having the same issue with the same kafka-python version of 2.0.1

lennstof commented 4 years ago

Having the same issue also with kafka-python 2.0.1. Don't know what causes it. After reboot it stops for a while but seems to get triggered when receiving data.

ninakka commented 3 years ago

Is there any progress on this issue ? I am also hitting this issue. I am using 1.4.7 where #1842 says it is fixed.

From @dpkp "I believe this is fixed in the latest release, 1.4.7. Please reopen if the issue persists."

@Prometheus3375 proposed a fix above. Can it be reviewed for picking up?

FANGOD commented 3 years ago

This problem occasionally occurs when closing a Kafka link, Is there a mistake?

forgoty commented 3 years ago

Confirm. Issue persists when trying to close the consumer connection with Kafka inside docker container.

mhworth commented 3 years ago

Same here--this happens as soon as I try to gracefully shut down a consumer. As it stands, I have to forcefully kill the process to get it to stop.

vlaskinvlad commented 3 years ago

Same

Same here--this happens as soon as I try to gracefully shut down a consumer. As it stands, I have to forcefully kill the process to get it to stop.

+1

NitroCao commented 3 years ago

I found another way to solve this problem without modifying source code of Kafka-python.

    def run(self) -> Iterator[Event]:
        self._consumer = KafkaConsumer(
            self._kafka_topic,
            bootstrap_servers=self._kafka_server,
            value_deserializer=self.data_deserializer,
            consumer_timeout_ms=16000)

        while not self._stop_receiver:
            try:
                for msg in self._consumer:
                    #  pdb.set_trace()
                    if isinstance(msg.value, Event):
                        yield msg.value
            except StopIteration:
                continue

    def close(self):
        if self._consumer is not None:
            #  self._consumer.unsubscribe()
            self._stop_receiver = True
            time.sleep(1)
            self._consumer.unsubscribe()
            self._consumer.close()
glorinli commented 1 year ago

I found another way to solve this problem without modifying source code of Kafka-python.

    def run(self) -> Iterator[Event]:
        self._consumer = KafkaConsumer(
            self._kafka_topic,
            bootstrap_servers=self._kafka_server,
            value_deserializer=self.data_deserializer,
            consumer_timeout_ms=16000)

        while not self._stop_receiver:
            try:
                for msg in self._consumer:
                    #  pdb.set_trace()
                    if isinstance(msg.value, Event):
                        yield msg.value
            except StopIteration:
                continue

    def close(self):
        if self._consumer is not None:
            #  self._consumer.unsubscribe()
            self._stop_receiver = True
            time.sleep(1)
            self._consumer.unsubscribe()
            self._consumer.close()

It works, seems the consumer_timeout_ms is the key point.

aiven-anton commented 1 year ago

I consistently ran into this when accidentally using a closed consumer, underlying socket exception was "Bad file descriptor", should someone run into the same issue. Error was on my part, but some sanity check with a nicer error message for this could have saved me an hour or so of debugging.

adityaraj-28 commented 1 year ago

@aiven-anton What was the fix for you, I am getting the same "Bad file descriptor"

aiven-anton commented 1 year ago

@adityaraj-28

We have a custom context manager for setting up and closing a consumer, I was using the consumer after it had been closed. Fix is simply not to do that. Code was more complex than the example, but simplified, it boils down to this:

with create_consumer() as consumer:
    # Use consumer here.
    ...

# If you use it here, outside the managed context, it's closed
# and you get a bad file descriptor error.