joowani / kq

Kafka-based Job Queue for Python
http://kq.readthedocs.io
MIT License
572 stars 24 forks source link

Empty start method params will stop worker running #17

Closed Semo closed 2 years ago

Semo commented 2 years ago

Hi, and thank you for your work. I found a problem running the documented example. I am using Python 3.8.10 (default, Nov 26 2021, 20:14:08) [GCC 9.3.0] on linux (Ubuntu 20.04 LTS)

When starting the Worker example, it immediately exits without fetching the messages, leaving no error message. If I enter any positive integer to the start() method, then it will work and continue as described in the docs until reaching the given number.

I isolated the line at: https://github.com/joowani/kq/blob/0e0795765ce245cbb1d8e366945973988a6826e4/kq/worker.py#L250

I added a logging command to your start-method, to print what the value may be and it isn't None but -1. Of course, the while-loop will exit. How to solve that?

The problem looks like a python3 related one. It's not possible to compare NoneType to anything else than None. Only that will return "True". In regard to the while-loop this means, that it will always be false and so the loop won't be initialized.

Quick and Dirty - Hack:


  if max_messages is not None:
            while messages_processed < max_messages:
                self._logger.info("max_messages in loop "+ str( max_messages) + " ...".format(self))
                record = next(self._consumer)

                message = Message(
                    topic=record.topic,
                    partition=record.partition,
                    offset=record.offset,
                    key=record.key,
                    value=record.value,
                )
                self._process_message(message)

                if commit_offsets:
                    self._consumer.commit()

                messages_processed += 1
            return messages_processed
        else:
            while True:
                record = next(self._consumer)

                message = Message(
                    topic=record.topic,
                    partition=record.partition,
                    offset=record.offset,
                    key=record.key,
                    value=record.value,
                )
                self._process_message(message)

                if commit_offsets:
                    self._consumer.commit()
joowani commented 2 years ago

Hi @Semo, thanks for reporting the bug. I fixed it in release 2.2.1. Could you please try upgrading using pip install kq --upgrade and try again? Thanks.

Semo commented 2 years ago

Works fine. Thank you for the fast response!

Used no parameter and a small integer parameter on start(). Worked both.