joowani / kq

Kafka-based Job Queue for Python
http://kq.readthedocs.io
MIT License
572 stars 24 forks source link

Using KQ with Flask #14

Closed sahilpaudel closed 3 years ago

sahilpaudel commented 4 years ago

System OS: macOS Catalina 10.15.6 Kafka : stable 2.6.0 zookeeper: stable 3.6.1

I am trying to using kq with my flask application and below is the code for the same. When the application starts the worker also starts with a topic provided. And when I hit an api /status it should execute a function add which is not happening here and print("add executed") is never called.

Also kafka-console-consumer --bootstrap-server localhost:9092 --topic my_topic gives some gibberish output when I hit the api

�cdill._dill
_create_namedtuple
qXJobq(XidqX    timestampqXtopicqXfuncqXargsqXkwargsqXtimeoutXkeyq     Xpartitionq
tq
  Xkq.jobq
�Ctmy_topicqcmain40f88fcc519f99e3d1d9q�
add
q]q(K
KKe}qKNNtq�q.

main.py

import os
from worker_app import start_worker
from queue import enqueue
from flask import Flask

def add(a, b, c):
    print("add executed")
    return a + b + c

app = Flask(__name__)

@app.route("/add")
def hello():
  enqueue("my_topic", add, [10, 20, 30])
  return "success"

if __name__ == '__main__':
    start_worker("my_topic")
    app.run(debug=True, use_reloader=False)

worker_app.py

import logging

from kafka import KafkaConsumer
from kq import Worker

# Set up logging.
formatter = logging.Formatter('[%(levelname)s] %(message)s')
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger = logging.getLogger('kq.worker')
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)

def start_worker(topic):
  # Set up a Kafka consumer.
  consumer = KafkaConsumer(
      bootstrap_servers='127.0.0.1:9092',
      group_id='my_group',
      auto_offset_reset='latest'
  )

  # Set up a worker.
  worker = Worker(topic=topic, consumer=consumer)
  worker.start()

queue.py

from kafka import KafkaProducer
from kq import Queue, Job

def enqueue(topic, func, args):

  # Set up a Kafka producer.
  producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')

  # Set up a queue.
  queue = Queue(topic=topic, producer=producer)

  # Enqueue a function call.
  job = Job(func=func, args=args, timeout=5)
  queue.enqueue(job)  # timeout is still 5
joowani commented 4 years ago

Hi @sahilpaudel,

worker.start is a blocking function. Problem is below:

if __name__ == '__main__':
    start_worker("my_topic")  # This blocks the code execution
    app.run(debug=True, use_reloader=False)  # This will not run

You should be running the workers in different processes and managing them separately from the flask app. As for the gibberish message in the topic, this is normal as you are seeing the serialized form of the python function add.

sahilpaudel commented 4 years ago

Hi @joowani Any suggestions how to go about that. It would be great ig you could help me with that.

Thanks.

joowani commented 4 years ago

HI @sahilpaudel,

There is not much more to it other than running worker.py separately. Preferably with a service manager like systemd.

sahilpaudel commented 4 years ago

Thanks @joowani We want to use kq in our live project that handles request in millions everyday. So we want to take suggestion from you if we can use it in production.

joowani commented 4 years ago

Hi @sahilpaudel,

kq is just a very light wrapper that sits on top of kafka-python library, which I do think is well-maintained and production ready. If your team is already well-versed in Python and Kafka (and kafka-python), then I would say it's worth trying out for POCs.

joowani commented 3 years ago

Closing due to inactivity. Please feel free to reopen if you have any other questions. Thanks.