Neos Flow package that allows for asynchronous and distributed execution of tasks.
Install this package using composer:
composer require flowpack/jobqueue-common
(or by adding the dependency to the composer manifest of an installed package)
Configure a basic queue by adding the following to your Settings.yaml
:
Flowpack:
JobQueue:
Common:
queues:
'some-queue':
className: 'Flowpack\JobQueue\Common\Queue\FakeQueue'
Initialize the queue (if required)
With
./flow queue:setup some-queue
you can setup the queue and/or verify its configuration.
In the case of the FakeQueue
that step is not required.
Note: The queue:setup
command won't remove any existing messages, there is no harm in calling it multiple times
Annotate any public method you want to be executed asynchronously:
use Flowpack\JobQueue\Common\Annotations as Job;
class SomeClass {
/**
* @Job\Defer(queueName="some-queue")
*/
public function sendEmail($emailAddress)
{
// send some email to $emailAddress
}
}
or use attributes instead of annotations (PHP 8.0 and later):
use Flowpack\JobQueue\Common\Annotations as Job;
class SomeClass {
#[Job\Defer(queueName: "some-queue")]
public function sendEmail($emailAddress)
{
// send some email to $emailAddress
}
}
Note: The method needs to be public and it must not return anything
Start the worker (if required)
With the above code in place, whenever the method SomeClass::sendEmail()
is about to be called that method call is converted into a job that is executed asynchronously[1].
Unless you use the FakeQueue
like in the example, a so called worker
has to be started, to listen for new jobs and execute them::
./flow flowpack.jobqueue.common:job:work some-queue --verbose
To get started let's first define some terms:
The Flowpack.JobQueue.Common
package comes with a very basic Message Queue implementation Flowpack\JobQueue\Common\Queue\FakeQueue
that allows for execution of Jobs using sub requests.
It doesn't need any 3rd party tools or server loops and works for basic scenarios. But it has a couple of limitations to be aware of:
It is not actually a queue, but dispatches jobs immediately as they are queued. So it's not possible to distribute the work to multiple workers
The JobManager
is not involved in processing of jobs so the jobs need to take care of error handling themselves.
For the same reason Signals are not emitted for the FakeQueue
.
With Flow 3.3+ The FakeQueue
supports a flag async
. Without that flag set, executing jobs block the main thread!
For advanced usage it is recommended to use one of the implementing packages like one of the following:
This is the simplest configuration for a queue:
Flowpack:
JobQueue:
Common:
queues:
'test':
className: 'Flowpack\JobQueue\Common\Queue\FakeQueue'
With this a queue named test
will be available.
Note: For reusable packages you should consider adding a vendor specific prefixes to avoid collisions. We recommend to use a classname or the package name with the function name (e.g. Flowpack.ElasticSearch.ContentRepositoryQueueIndexer.
The following parameters are supported by all queues:
Parameter | Type | Default | Description |
---|---|---|---|
className | string | - | FQN of the class implementing the queue |
maximumNumberOfReleases | integer | 3 | Max. number of times a message is re- released to the queue if a job failed |
executeIsolated | boolean | FALSE | If TRUE jobs for this queue are executed in a separate Thread. This makes sense in order to avoid memory leaks and side-effects |
outputResults | boolean | FALSE | If TRUE the full output (stdout + stderr) of the respective job is forwarded to the stdout of its "parent" (only applicable if executeIsolated is true ) |
queueNamePrefix | string | - | Optional prefix for the internal queue name, allowing to re-use the same backend over multiple installations |
options | array | - | Options for the queue. Implementation specific (see corresponding package) |
releaseOptions | array | - | Options that will be passed to release() when a job failed.Implementation specific (see corresponding package) |
A more complex example could look something like:
Flowpack:
JobQueue:
Common:
queues:
'email':
className: 'Flowpack\JobQueue\Beanstalkd\Queue\BeanstalkdQueue'
maximumNumberOfReleases: 5
executeIsolated: true
outputResults: true
queueNamePrefix: 'staging-'
options:
client:
host: 127.0.0.11
port: 11301
defaultTimeout: 50
releaseOptions:
priority: 512
delay: 120
'log':
className: 'Flowpack\JobQueue\Redis\Queue\RedisQueue'
options:
defaultTimeout: 10
As you can see, you can have multiple queues in one installations. That allows you to use different backends/options for queues depending on the requirements.
If multiple queries share common configuration presets can be used to ease readability and maintainability:
Flowpack:
JobQueue:
Common:
presets:
'staging-default':
className: 'Flowpack\JobQueue\Doctrine\Queue\DoctrineQueue'
queueNamePrefix: 'staging-'
options:
pollInterval: 2
queues:
'email':
preset: 'staging-default'
options:
tableName: 'queue_email' # default table name would be "flowpack_jobqueue_messages_email"
'log':
preset: 'staging-default'
options:
pollInterval: 1 # overrides "pollInterval" of the preset
This will configure two DoctrineQueue
s "email" and "log" with some common options but different table names and poll intervals.
The job is an arbitrary class implementing Flowpack\JobQueue\Common\Job\JobInterface
.
This package comes with one implementation StaticMethodCallJob
that allows for invoking a public method (see Quickstart)
but often it makes sense to create a custom Job:
<?php
use Flowpack\JobQueue\Common\Job\JobInterface;
use Flowpack\JobQueue\Common\Queue\Message;
use Flowpack\JobQueue\Common\Queue\QueueInterface;
class SendEmailJob implements JobInterface
{
protected $emailAddress;
public function __construct($emailAddress)
{
$this->emailAddress = $emailAddress;
}
public function execute(QueueInterface $queue, Message $message)
{
// TODO: send the email to $this->emailAddress
return true;
}
public function getIdentifier()
{
return 'SendEmailJob';
}
public function getLabel()
{
return sprintf('SendEmailJob (email: "%S")', $this->emailAddress);
}
}
Note: It's crucial that the execute()
method returns TRUE on success, otherwise the corresponding message will be released again and/or marked failed.
With that in place, the new job can be added to a queue like this:
use Flowpack\JobQueue\Common\Job\JobInterface;
use Flowpack\JobQueue\Common\Job\JobManager;
use Neos\Flow\Annotations as Flow;
class SomeClass {
/**
* @Flow\Inject
* @var JobManager
*/
protected $jobManager;
/**
* @return void
*/
public function queueJob()
{
$job = new SendEmailJob('some@email.com');
$this->jobManager->queue('queue-name', $job);
}
}
Use the flowpack.jobqueue.common:queue:*
and flowpack.jobqueue.common:job:*
commands to interact with the job queues:
Command | Description |
---|---|
queue:list | List configured queues |
queue:describe | Shows details for a given queue (settings, ..) |
queue:setup | Initialize a queue (i.e. create required db tables, check connection, ...) |
queue:flush | Remove all messages from a queue (requires --force flag) |
queue:submit | Submit a message to a queue (mainly for testing) |
job:work | Work on a queue and execute jobs |
job:list | List queued jobs |
When working with JobQueues proper monitoring is crucial as failures might not be visible immediately.
The JobManager
emits signals for all relevant events, namely:
Those can be used to implement some more sophisticated logging for example:
<?php
namespace Your\Package;
use Flowpack\JobQueue\Common\Job\JobManager;
use Flowpack\JobQueue\Common\Queue\Message;
use Flowpack\JobQueue\Common\Queue\QueueInterface;
use Neos\Flow\Core\Bootstrap;
use Neos\Flow\Log\SystemLoggerInterface;
use Neos\Flow\Package\Package as BasePackage;
class Package extends BasePackage
{
/**
* @param Bootstrap $bootstrap
* @return void
*/
public function boot(Bootstrap $bootstrap)
{
$dispatcher = $bootstrap->getSignalSlotDispatcher();
$dispatcher->connect(
JobManager::class, 'messageFailed',
function(QueueInterface $queue, Message $message, \Exception $jobExecutionException = null) use ($bootstrap) {
$additionalData = [
'queue' => $queue->getName(),
'message' => $message->getIdentifier()
];
if ($jobExecutionException !== null) {
$additionalData['exception'] = $jobExecutionException->getMessage();
}
$bootstrap->getObjectManager()->get(SystemLoggerInterface::class)->log('Job failed', LOG_ERR, $additionalData);
}
);
}
}
This would log every failed message to the system log.
This package is licensed under the MIT license
Pull-Requests are more than welcome. Make sure to read the Code Of Conduct.
[1] The FakeQueue
actually executes Jobs synchronously unless the async
flag is set (requires Flow 3.3+)