import logging
from kafka import KafkaConsumer
from kq import Worker
# Set up logging.
formatter = logging.Formatter('[%(levelname)s] %(message)s')
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger = logging.getLogger('kq.worker')
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)
# Set up a Kafka consumer.
consumer = KafkaConsumer(
bootstrap_servers='127.0.0.1:9092',
group_id='group',
auto_offset_reset='latest'
)
# Set up a worker.
worker = Worker(topic='kq_topic', consumer=consumer)
worker.start()
producer.py
import requests
from kafka import KafkaProducer
from kq import Queue, Job
# Set up a Kafka producer.
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
# Set up a queue.
queue = Queue(topic='kq_topic', producer=producer)
def add(a, b):
return a + b
# Enqueue a function call.
job = Job(func=add, args=[10,20], timeout=5)
queue.enqueue(job) # timeout is still 5
worker.py
producer.py
when I run worker.py it hangs on
when I run producer.py nothing happens. Can you please help to get started.
Thanks.