A generic multi-adapter queuing system for magento.
This module is primarily aimed at developers who need a reliable queue management system for performing asyncronous tasks. This was built because there wasn't a decent queue management module within magento or the community that filled all my requirements. This also avoids the issues that can exist with database and cron based queueing systems (such as available with Magent Enterprise)
This module has been built to be a framework for both the producer (the task adding to the queue) and the worker (the task that performs items on the queue).
Each queue has it's own handler, so multiple tasks that are a part of the same queue can easily share resources and save memory, and it is easy to create new queues with new queue handlers. Currently this is provided with only one queue (default
) and one worker (test
) that merely removes itself from the queue. However this should act as enough of a framework for a developer to base his or her work upon.
I built this with modgit (https://github.com/jreinke/modgit) in mind - and so if you have modgit installed:
$ modgit add lilqueue https://github.com/lilmuckers/magento-lilmuckers_queue.git
Or you can just merge the files manually into your magento project. This will be added as a Community pear module once it's been fully tested and deemed stable.
The configuration is similar to the cache
configuration, and is held within the local.xml
file. For a very simple beanstalkd configuration you'd merge this into your local.xml
:
<?xml version="1.0"?>
<config>
<global>
<queue>
<backend>beanstalkd</backend>
<beanstalkd>
<servers>
<server>
<host>127.0.0.1</host>
</server>
</servers>
</beanstalkd>
</queue>
</global>
</config>
For a simple Amazon SQS configuration, you must install AWS libs by pear:
pear -D auto_discover=1 install pear.amazonwebservices.com/sdk
and you'd merge this into your local.xml
:
<?xml version="1.0"?>
<config>
<global>
<queue>
<backend>amazonsqs</backend>
<amazonsqs>
<connection>
<key>{{AWS KEY}}</key>
<secret>{{AWS SECRET}}</secret>
<region>{{AWS REGION}}</region>
</connection>
</amazonsqs>
</queue>
</global>
</config>
For a Gearman configuration, you'd merge this into your local.xml
:
<?xml version="1.0"?>
<config>
<global>
<queue>
<backend>gearman</backend>
</queue>
</global>
</config>
See the file app/etc/local.xml.queuesample
for advanced configuration examples.
config.xml
, with the workers that exist in that queue. All workers are run as singletons
.<?xml version="1.0"?>
<config>
...
<queues>
<queueName>
<label>The Queue Name</label>
<class>module/queueHandler</class>
<workers>
<taskName>
<class>module/worker</class>
<method>methodName</method>
</taskName>
</workers>
</queueName>
</queues>
</config>
<?php
$_queue = Mage::helper('lilqueue')->getQueue('queueName');
<?php
$_task = Mage::helper('lilqueue')->createTask('taskName', array('data'=>'to provide', 'to'=>'the worker'), $storeToRunAs);
<?php
$_task->setPriority(100)
->setDelay(60)
->setTtr(60);
<?php
$_queue->addTask($_task);
Workers are even easier, as they're just a method that receive the task as an argument. They must set a status to the task once they're done.
If a task is taking longer than the allowed time by the queue backend, it can extend that time by calling touch()
on the task.
<?php
class My_Module_Model_Worker extends Lilmuckers_Queue_Model_Worker_Abstract
{
public function methodName(Lilmuckers_Queue_Model_Queue_Task $task)
{
//get the store assigned with the task (defaults to the store that was running when the task was assigned)
$store = $task->getStore();
//get the queue handler for this queue
$queue = $task->getQueue();
//get the data assigned with the task
$data = $task->getData(); // $task->getSpecificData();
//This task ended properly
$task->success();
//this task needs to be repeated
$task->retry();
//this task errored and we should drop it from the queue for later examination
$task->hold();
//this worker is taking a long time, we should extend the time we're allowed to use it
$task->touch();
}
}
The workers are run using a shell script located within /shell/
, and you can run all queues, or a subset of queues, allowing you to split the different queues to different servers, or whatever arrangement you so wish, to allow maximum flexibility. The script can also be run multiple times to allow for multiple workers
$ php /path/to/magento/shell/queue.php --watch <queues>
If you wish to use the framework with another queueing backend then you'll need to build an adapter to support the interface between the module and the queueing system.
First off you need to define the adapter model within the config.xml
(preferably of your own module - but if you're contributing to this module, then... woo!)
<?xml version="1.0"?>
<config>
<global>
<queue>
<adapters>
<adapterName>
<class>module/adapter_path</class>
</adapterName>
</adapters>
</queue>
</global>
</config>
Now you need to build the adapter class. The abstract/interface was built with the beanstalkd methodology in mind, so to be truly compatible you'll have to come up with equivilent functionality. This should extend Lilmuckers_Queue_Model_Adapter_Abstract
. See that class for a list of abstract methods.
Within the local.xml
you'll now need to set the backend value to the code for the adapter you assigned in step 1 (in this example adapterName
).
<?xml version="1.0"?>
<config>
<global>
<queue>
<backend>adapterName</backend>
</queue>
</global>
</config>
Have a nice cup of tea and a sit down.
The unit tests are built with EcomDev_PHPUnit and should run with the standard test-suite.
The tests cover:
The AmazonSQS adapter isn't covered by the unit tests because of reasons.
You can manually send messages to the queue with the following command:
$ cd /path/to/magento/shell/
$ php -f queue.php --send "default:test:hello"
This will send a message to the configured queue to use the default queue, run the test worker, and send the message of "hello". This could then be viewed with your favorite monitoring software for your queue backend.
There's a few things I've tripped over when using this module in testing and in implementation:
admin
store - so using Mage::app()->getStore()
will return the admin store view.Mage::getStoreConfig()
, loading collections in the correct store context, and so forth) use the $task->getStore()
method to get the store view that the task was created from.php -f shell/queue.php --watch all
) it can take time for a queue message to be received. It is currently designed to wait 5 seconds on each queue for a message before moving on to the next one. This is slower than 0 second polling, but much less cpu and networking intensive. You can override this by specifying a different wait time in the local.xml
on the path global/queue/amazonsqs/connection/wait
(Default is 5 seconds). Alternatively you can specify a worker stream for each queue, which will handily overcome the issue.0-340
- High, 341-682
- Normal, 683-1024
- Low.