amphp / parallel

An advanced parallelization library for PHP, enabling efficient multitasking, optimizing resource use, and application responsiveness through multiple CPU threads.
MIT License
763 stars 62 forks source link

Creating a pool with workers #173

Open ebeyrent opened 1 year ago

ebeyrent commented 1 year ago

I'm slightly baffled by what's happening with my code. I have a number of message queues, and each queue specifies how many workers should be created to process it. Based on that configuration value, I create n instances of my queue worker and add them to the pool. However, when I call getWorkerCount(), I get the wrong value.

// Create a worker pool.
$pool = new ContextWorkerPool(MessageQueueConfig::MAX_WORKERS);
$worker_count = 6;
if ($worker_count < 1) {
  $worker_count = 1;
}
for ($i = 0; $i < $worker_count; $i++) {
  // Create an instance of the queue-specific worker plugin.
  $instance = $this->queueWorkerManager->createInstance('message_queue:' . $queue_config->id());
  dump('Instance created');
  $pool->submit(
    $instance
  );

}
$this->processes[$queue_config->id()] = $pool;
dump('Pool id = ' . $queue_config->id(),  'Worker count = ' . $pool->getWorkerCount(), 'Idle count = ' . $pool->getIdleWorkerCount());

This gives me:

^ "Instance created"
^ "Instance created"
^ "Instance created"
^ "Instance created"
^ "Instance created"
^ "Instance created"
^ "Pool id = immediate"
^ "Limit = 5"
^ "Worker count = 3"
^ "Idle count = 0"

So, definitely creating 6 instances, my limit is 5, so I expected getWorkerCount() to return a value of 5, but it doesn't. It seems that if I create one or two instances, the count is correct, but anything above that only causes that method to return 3.

I'd be grateful for any insight into what's happening!

kelunik commented 1 year ago

submit submits tasks to the worker pool, but it doesn't create workers. If one of the existing workers is ready to accept a new task by the time submit is called again, an existing worker will be reused instead of creating a new worker.

ebeyrent commented 1 year ago

I may be struggling conceptually with how this works, and am not finding the documentation to be useful.

If I understand correctly, when I create a new ContextWorkerPool, it doesn't create workers. How then do workers get created?

I think I was also misunderstanding the submit() method actually does - I think that the task has the code that does the work, and the worker performs that work by calling the run() method. Is that correct?

Conceptually speaking, I have three queues (high_priority, medium_priority, low). For each queue, I want to create a single process for each queue (but running concurrently) that connects to the queue, claims an item, and then submits that item to a worker pool that's running multiple workers. To me, that suggests one worker pool for the task that connects to a queue, and that task would create a pool for processing the items in the queue that it's connected to.

Do I have that right?

kelunik commented 1 year ago

Improving the documentation is on our list. If you have specific suggestions please open a separate issue or even better a pull request with concrete changes.

You're right, creating the pool does not immediately create the workers. They're created as needed when new tasks are submitted to the pool. If none of the existing workers can be used and the pool is below its limit, it'll create a new worker.

I think I was also misunderstanding the submit() method actually does - I think that the task has the code that does the work, and the worker performs that work by calling the run() method. Is that correct?

Yes, that's correct.

If you have three queues, I wouldn't use a worker pool for the queue watching. If you need to use blocking IO for the queue system, you can directly use the context API and create three separate contexts. If the queue IO is non-blocking, you can do it all in the parent process itself and only have three pools there to which you submit tasks based on the priority.