Open bschmitt opened 8 years ago
Made it! Just a few lines of code:
function queue_rpc($queue, $message, $timeout = 0) {
/* @var Bschmitt\Amqp\Publisher $publisher */
$publisher = app()->make('Bschmitt\Amqp\Publisher');
$publisher->connect();
$publisher->getConnection()->set_close_on_destruct();
$replyTo = $publisher->getChannel()->queue_declare(
'',
false,
false,
true,
true
);
$replyTo = $replyTo[0];
$publisher->getChannel()->queue_declare(
$queue,
false,
false,
false,
false
);
$response = false;
$publisher->getChannel()->basic_consume(
$replyTo,
'',
false,
false,
false,
false,
function ($message) use (&$response) {
$response = $message;
}
);
$publisher->getChannel()->queue_bind($queue, 'amq.direct', $queue);
$publisher->getChannel()->basic_publish(
new \Bschmitt\Amqp\Message(
$message,
[
'content_type' => 'text/plain',
'delivery_mode' => 1,
'reply_to' => $replyTo,
]
),
'amq.direct',
$queue
);
$publisher->getChannel()->wait(null, false, $timeout);
return $response;
}
queue_rpc('ping-queue', 'ping', 10); // -> pong
@todo add consumer example, but it is a lot simpler
@AidasK Could you fork this repository and open a pull request, please?
i think you are missing correlation_id and i would set the delivery_mode to 2,
@anedisi correlation_id is not needed if temporary queue is used only once. delivery_mode 1 is intentional, because persisted message has no use to me if producer dies.
@stevenklar it is not part of code you can just push to the repository. Every param must be configurable.
This could be a method in https://github.com/bschmitt/laravel-amqp/blob/master/src/Publisher.php Publisher class, e.g.:
public function rpc($routing, $message, $timeout = 0, $queueOptions, $exchangeOptions)
{
//@todo check if message is instance of Message class
//@todo use queueOptions options for queue declaration
//@todo use exchangeOptions for exchange declaration if needed, but optional
//@notice multiple messages per temp queue will not be supported
/* @var Bschmitt\Amqp\Publisher $publisher */
$publisher = app()->make('Bschmitt\Amqp\Publisher');
$publisher->connect();
$publisher->getConnection()->set_close_on_destruct();
$replyTo = $publisher->getChannel()->queue_declare(
'',
false,
false,
true,
true
);
$replyTo = $replyTo[0];
$publisher->getChannel()->queue_declare(
$queue,
false,
false,
false,
false
);
$response = false;
$publisher->getChannel()->basic_consume(
$replyTo,
'',
false,
false,
false,
false,
function ($message) use (&$response) {
$response = $message;
}
);
$publisher->getChannel()->queue_bind($queue, 'amq.direct', $queue);
$publisher->getChannel()->basic_publish(
new \Bschmitt\Amqp\Message(
$message,
[
'content_type' => 'text/plain',
'delivery_mode' => 1,
'reply_to' => $replyTo,
]
),
'amq.direct',
$queue
);
$publisher->getChannel()->wait(null, false, $timeout);
return $response;
}
Feel free to modify this and fork/push.
fair enough for delivery_mode, but i would argue people to use correlation_id for rpc, or at least it should be the option for something like a general purpose library like this is.
Will test this out and submit a first proposal with the resolved todo's you mentioned.
Thank you!
@AidasK If you would like to add this yourself, please don't hesitate and open a pull request.
I am not planing to make a pull request for this, so feel free to push it by yourself. Tag me for review if needed :)
Worker example:
\Amqp::consume('worker', function (AMQPMessage $message, Consumer $consumer) {
$this->info($message->getBody(), 'v');
$correlationId = $message->has('correlation_id') ? $message->get('correlation_id') : null;
$consumer->getChannel()->basic_publish(
new AMQPMessage(
$response,
[
'content_type' => 'text/plain',
'delivery_mode' => 1,
'correlation_id' => $correlationId
]
),
'',
$message->get('reply_to')
);
}, [
'exchange' => 'amq.direct',
'exchange_type' => 'direct',
'queue_force_declare' => true,
'queue_durable' => false,
'consumer_no_ack' => true,
]);
For those which are interested, there is some discussion in #54
I have a process like this in my microservice model: service A-> publish service B service B -> service A How to get the results of service B sent to A in laravel controller.
Sometimes it is needed to run a function on a remote computer and wait for the result. Goal: Implement a easy integrated Remote Procedure Call (RPC) workflow.
Reference: https://www.rabbitmq.com/tutorials/tutorial-six-php.html