joowani / kq

Kafka-based Job Queue for Python
http://kq.readthedocs.io
MIT License
572 stars 24 forks source link

kq doesn't start on MacOS 11.1 #16

Closed dyerrington closed 3 years ago

dyerrington commented 3 years ago

I could probably switch to Docker in my development environment to get around this but figured I would report this.

Here's my basic anaconda environment: environment.txt

I've tested both in bash and zsh. My worker setup is:

if __name__ == '__main__' and hasattr(args, "group") and args.group:
    # Set up logging.
    formatter = logging.Formatter('[%(levelname)s] %(message)s')
    stream_handler = logging.StreamHandler()
    stream_handler.setFormatter(formatter)

    logger = logging.getLogger('kq.worker')
    logger.setLevel(logging.DEBUG)
    logger.addHandler(stream_handler)

    # Set up a Kafka consumer.
    print('Attempting to connect to kafka @: ', os.environ['bootstrap_server'])
    consumer = KafkaConsumer(
        bootstrap_servers = os.environ['bootstrap_server'],
        auto_offset_reset  = 'latest',
        group_id  = f"biddocs-{args.group}"
    )

    print("Group is: ", args.group) 

    worker = Worker(topic = 'bid_docs', consumer = consumer)
    worker.start()

The output is just a single line then it seems to die:

[INFO] Starting Worker(hosts=127.0.0.1:9092, topic=bid_docs, group=biddocs-1) ...

The Kafka logs don't show even an attempt to connect. When I find a solution to this problem, I'll post an update. Any help appreciated!

dyerrington commented 3 years ago

Also, it's worth mentioning that the same code and Anaconda environment on Windows and Ubuntu 16.x don't have this issue at all. I can confirm Kafka is running and is capable of client connections outside this use case.

joowani commented 3 years ago

Hi @dyerrington,

Apologies for the late response. Were you able to figure this one out?

dyerrington commented 3 years ago

My dedication to using kq is quite high since I've decided to use it in a production system now. The main problem I've been having revolves around Docker-specific problems on Macs with M1 chips even on Docker. I've decided to run it natively in my development environment without a container and had some success with a minor update to the Worker class in the start method:

Around line 232 in worker.py, I changed default for max_messages argument:

    def start(
        self, max_messages: Optional[int] = None, commit_offsets: bool = True
    ) -> int:

To this:

    def start(
        self, max_messages: Optional[int] = math.inf, commit_offsets: bool = True
    ) -> int:

At least in the version, I am using, this block never runs from line 254

        while max_messages is not None and messages_processed < max_messages:
            record = next(self._consumer)
            message = Message(
                topic=record.topic,
                partition=record.partition,
                offset=record.offset,
                key=record.key,
                value=record.value,
            )
            self._process_message(message)

            if commit_offsets:
                self._consumer.commit()

            messages_processed += 1

I can see that in my docker container, I'm using kq 2.0.1 which doesn't seem to have the issue. On my Mac I'm using 2.2.0 and this issue does seem to happen. Looking back at 2.0.1 I notice the start method uses the same default that I set in my now de-versioned 2.2.0 that I made earlier today. I suppose I should be passing this value instead.. well the worker does seem to start with 2.2.0 on my local M1 machine and doesn't exit with max_messages = math.inf.

@joowani this is probably very obvious but should I be passing my own infinite value on the user side of the API? It seems like the effect is still the same when I do in v2.2.0.

joowani commented 3 years ago

Hi @dyerrington,

You should be setting max_messages only if you want a finite limit (for testing etc). Otherwise, you should not set this value at all and just call start() or start(commit_offsets=True). This way would work for both versions I believe.

Best

dyerrington commented 3 years ago

Thanks @joowani. I can confirm that using .start() in 2.2.0 returns 0 without that while block running using defaults, at least on M1s.