joowani / kq

Kafka-based Job Queue for Python
http://kq.readthedocs.io
MIT License
572 stars 24 forks source link

Job processing not happening. #13

Closed sahilpaudel closed 4 years ago

sahilpaudel commented 4 years ago

worker.py

import logging

from kafka import KafkaConsumer
from kq import Worker

# Set up logging.
formatter = logging.Formatter('[%(levelname)s] %(message)s')
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger = logging.getLogger('kq.worker')
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)

# Set up a Kafka consumer.
consumer = KafkaConsumer(
    bootstrap_servers='127.0.0.1:9092',
    group_id='group',
    auto_offset_reset='latest'
)

# Set up a worker.
worker = Worker(topic='kq_topic', consumer=consumer)
worker.start()

producer.py

import requests

from kafka import KafkaProducer
from kq import Queue, Job

# Set up a Kafka producer.
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')

# Set up a queue.
queue = Queue(topic='kq_topic', producer=producer)

def add(a, b):
  return a + b

# Enqueue a function call.
job = Job(func=add, args=[10,20], timeout=5)
queue.enqueue(job)  # timeout is still 5

when I run worker.py it hangs on

[INFO] Starting Worker(hosts=127.0.0.1:9092, topic=kq_topic, group=group) 

when I run producer.py nothing happens. Can you please help to get started.

Thanks.

joowani commented 4 years ago

Hi @sahilpaudel,

Have you checked that your Kafka (and zookeeper) and running, reachable? Have you checked the logging? Thanks.

sahilpaudel commented 4 years ago

Yes @joowani I have verified that and kafka is working and when start worker informational logs are appearing in the console.

joowani commented 4 years ago

Hi @sahilpaudel,

You would have to give me more details or something to help me reproduce the issue.

sahilpaudel commented 4 years ago

I just have above two code files and kafka server that is already started. When I run python3 worker.py I get Starting Worker(hosts=127.0.0.1:9092, topic=kq_topic, group=group)

When I run python3 producer.py nothing happens the code exits without print or doing anything.

joowani commented 4 years ago

Hi @sahilpaudel,

What OS and versions of Kafka and zookeeper are you using? Thanks.

sahilpaudel commented 4 years ago

Sorry I missed that @joowani

OS: macOS Catalina 10.15.6 Kafka : stable 2.6.0 zookeeper: stable 3.6.1

Thanks.

joowani commented 4 years ago

Hi @sahilpaudel,

I was able to reproduce the problem. It was due to kafka-python (dependency of kq) changing the behaviour of its producer put API. I just released version 2.0.1. Could you please run pip install --upgrade kq and try again? Thank you.

sahilpaudel commented 4 years ago

Thank you @joowani It is working now 👍