coleifer / huey

a little task queue for python
https://huey.readthedocs.io/
MIT License
5.15k stars 367 forks source link

Hello, please ask how the huey consumer is started in the software written in python is more standard #715

Closed wnark closed 1 year ago

wnark commented 1 year ago

Hello, I am currently writing a cross-platform software for Windows/Linux using python, which uses huey as a message queue service within the software. For example, the producer receives the directory of the press test and the consumer starts the press test according to the press test directory. Now there is a problem, the official tutorials simple and flask_ex, are using two terminals, the producer + consumer started separately, where the core is basically using python huey_consumer.py main.huey to start the consumer. But according to my own ideas, in the software used by the user, the user does not need to install the python environment, through Windows services/Linux systemd/ double-click to start the software, the software main process is through the multi-process way to start the producer sub-process and consumer sub-process respectively. Do I need to learn and digest the contents of huey_consumer.py and convert huey_consumer.py main.huey into a separate function to start the consumer. Since I looked at a few flask projects on GitHub and they are executing huey_consumer.py main.huey directly inside the code I feel a bit strange, so I wanted to ask you. If it is convenient for you, please ask if there is a simpler and clearer project that integrates huey_consumer.py within this project code, rather than the very strange way of running the consumer by executing python within python. I see that there are gjango_ex projects on the official website that don't use huey_consumer.py, but also don't use multiple processes to start consumers and producers at the same time.

wnark commented 1 year ago

Using Mini-Huey seems like the easiest way to integrate it into the application, but my program calls f3stress to perform the task of pressure testing the hard disk, which is considered a heavy-duty scenario, so I can't use Mini-Huey

coleifer commented 1 year ago

If you wish for everything to run inside a single process you can either:

  1. Run huey_consumer as a subprocess within your parent process, or
  2. Recommend: not use huey at all since it seems like a poor fit for your use-case.

I don't know why you would insist on using Huey here, to be honest, since it also typically implies running a Redis server as well (although you could just use Sqlite). But even then, you're probably better off just using multiprocessing.Pool from the standard lib since everything is all happening in the context of your single application: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool

wnark commented 1 year ago

If you wish for everything to run inside a single process you can either:

  1. Run huey_consumer as a subprocess within your parent process, or
  2. Recommend: not use huey at all since it seems like a poor fit for your use-case.

I don't know why you would insist on using Huey here, to be honest, since it also typically implies running a Redis server as well (although you could just use Sqlite). But even then, you're probably better off just using multiprocessing.Pool from the standard lib since everything is all happening in the context of your single application: https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool

Thank you for your answer, it's like this. My software is designed to be the core of a server's pressure testing software, and in the future it will interface with more tasks, such as CPU, memory, and functional pressure testing, so I need a message queue to effectively manage these functional sub-processes. At first I chose to use fastapi as the api server, because it is light enough. However, fastapi's own background task component, Background Tasks, is not recommended for heavy-duty scenarios, and Celery, as suggested by fastapi, has stopped official support for the Windows platform. Yes, I decided to use sqlite to store message queue data. By comparison I found that huey works on Windows + Linux as well as supporting sqlite, so I decided to go with huey. Pool to manage multiple task processes and use sqlite to store status messages of sub-processes, this would be the best choice. Not so good, I am not confident to operate the database myself at the moment, so I would like to borrow your framework to implement the message queue for now. Just be able to achieve the target functionality first. For now the front end to accept the post, send tasks to the back end to execute this path, do not need high performance (using fastapi because according to the search learned that it is light enough), because the software is for personal use, can wait more it will be executed, but the task sent to ensure the execution, as well as the state is accurate, so you need a stable framework to perform the task. Pool for management and control, after all, I am currently temporarily not confident to do and you compared to make so many stable features: signals, logging, locking-tasks, immediate-mode

wnark commented 1 year ago

I tried to write a demo based on huey_consumer.py, and so far it's working. I don't know if this idea is standard practice for huey, after all, it's not really elegant to call python with a terminal command inside a python script:

from huey import SqliteHuey
from huey.consumer import Consumer
from huey.consumer_options import ConsumerConfig
from multiprocessing import Process

# Creating Huey objects
huey = SqliteHuey(filename='worker.db', fsync=True)

# Create Producer Tasks
@huey.task()
def my_task():
    print("Producing task")
    return None

# Create consumer process functions
def start_consumer():
    # Huey Consumer Configuration
    config = ConsumerConfig(
        worker_type='thread',
        workers=2,
        max_delay=3600,
        utc=True,
        consumer_timeout=10,
        periodic=True,
        initial_delay=60,
        backoff=2,
    )
    # The validate method of the ConsumerConfig instance object is called to verify the legitimacy of the parameters.
    config.validate()
    # Huey Consumer Configuration

    # !!!
    consumer = Consumer(huey, **config.values)
    consumer.run()

# Create producer process function
def start_producer():
    # Start a producer mission
    my_task()

# Start consumer process
consumer_process = Process(target=start_consumer)
consumer_process.start()

# Start the producer process
producer_process = Process(target=start_producer)
producer_process.start()

# Wait for the consumer and producer processes to finish
# If you can't end it, you can also force it to stop by checking the end of the pressure test.
consumer_process.join()
producer_process.join()
wnark commented 1 year ago
    consumer = Consumer(huey, **config.values)

This writeup refers to huey_consumer,py, and according to my search config should be a dictionary before using **config.values to unlock it, but I don't know why here, even if config is an object, it is still used normally, without the need for config to be a dictionary

wnark commented 1 year ago

Is there anything I need to improve in the demo I wrote above to comply with the specification?

I tried to run it and it worked

wnark commented 1 year ago

Tried to modify it:

from huey import SqliteHuey
from huey.consumer import Consumer
from huey.consumer_options import ConsumerConfig
from multiprocessing import Process

# Create a Huey object
huey = SqliteHuey(filename='demo/huey/sql/worker.db', fsync=True)

# If running the producer in a child process, variables need to be input in the main process
# input_beans=input('How many beans? ')

# Define the task that the consumer will execute
@huey.task()
def my_task(num):
    """Task that the consumer will execute

    Here we output the number of beans input by the main process and return a message
    If no return value is specified, return None; and the producer does not need to use get()
    If a return value is specified, get() must be used to output the return value so that the return value in the database can be cleared
    """
    print(f"There are {num} beans")

    return f"Execution complete: There are {num} beans"

def start_consumer():
    """Function to create the consumer process
    Explanation of the code:
    https://www.wnark.com/archives/237.html

    Configuration parameter source:
    https://huey.readthedocs.io/en/latest/contrib.html

    Two configurations are not set:
    1. "flush_locks" specifies whether to clear locks
    2. "extra_locks" can be used to specify additional locks.
    This function is used to set and coordinate the execution of worker processes and schedulers, and to register signal handlers.
    """
    config = ConsumerConfig(
        workers=2,  # Number of consumers
        worker_type='thread',  # Type of consumer
        initial_delay=1,   # Minimum polling interval, same as -d.
        backoff=2,  # Exponential backoff rate, -b.
        max_delay=10,  # Maximum possible polling interval, -m.
        periodic=True,  # Allow periodic tasks
        check_worker_health=True,  # Enable worker health check
        health_check_interval=1,  # Specifies the polling interval for health check
        utc=True,  # Convert timestamps to UTC time
        consumer_timeout=10,  # Consumer timeout is 10 seconds
    )
    # Call the validate() method of the ConsumerConfig instance object to verify the legality of some key parameters.
    config.validate()
    # Huey consumer configuration
    consumer = Consumer(huey, **config.values)
    # The consumer is now running at this step
    consumer.run()

# def start_producer():
#     """Function to create the producer process running in multiple processes
#     """
#     beans = input_beans
#     # Start the producer task
#     x = my_task(int(beans)).get(blocking=True)
#     print(x)

def start_producer_main():
    """Function to run the producer process in the main process
    If it is an input, it must be entered in the main process because: the standard input stream (stdin) is not attached to the Process process when it is created.
    Since it doesn't make sense for multiple processes to read input from the same stream, by default stdin is assumed to be read exclusively by the main process.

    https://stackoverflow.com/questions/63893634
    """
    input_beans=input('How many beans? ')
    beans = input_beans
    # Start the producer task
    x = my_task(int(beans)).get(blocking=True)
    print(x)

# Start the consumer process in a separate child process
consumer_process = Process(target=start_consumer)
consumer_process.start()

# Start the producer process in a separate child process
# producer_process = Process(target=start_producer)
# producer_process.start()

# Begin looping producer to send commands in the main process
while True:
    start_producer_main()

# Wait for the consumer process and producer process to end
consumer_process.join()
# producer_process.join()