Open SuheylZ opened 3 months ago
Objective i want to read items from jetstream and process them one by one using my handler.
Solution
Somewhere in my app i have this
$this->client->subscribe("mystream", "mysubject", function (array|null $headers, mixed $body, string $subject){ echo $body; return true; });
and this is the detail of the subscriber function.
public function subscribe(string $name, string $subject, callable $listener): void { $stream = $this->client->getApi()->getStream($name); $consumer = $stream->getConsumer("myConsumer"); $consumer->getConfiguration()->setSubjectFilter($name.$subject); if(!$consumer->exists()) { $cosumer->create(); } $queue = $consumer->getQueue(); while (true) { try { $msgs = $queue->fetchAll(10); foreach ($msgs as $msg) { if ($msg !== null) { $headers = $msg->payload->headers ?? array(); $body = $msg->payload->body; $subject = $msg->subject; $ret = $listener($headers, $body, $subject); if ($ret) { $msg->ack(); } } } } catch (\Exception $e) { echo $e->getMessage(); } } }
on NATS i have been able to publish messaages. this is what i get from nats-cli from mystream.mysubject
bbdfca7cb16a:~# nats stream view [server] ? Select a Stream mystream [1] Subject: mysubject Received: 2024-06-30T16:05:56Z hello world [2] Subject: mysubject Received: 2024-06-30T16:20:25Z hello world [3] Subject: mysubject Received: 2024-06-30T19:09:33Z How are you [4] Subject: mysubject Received: 2024-06-30T19:13:26Z How are you [5] Subject: mysubject Received: 2024-06-30T19:13:30Z Are you there
bbdfca7cb16a:~# nats stream view [server] ? Select a Stream mystream [1] Subject: mysubject Received: 2024-06-30T16:05:56Z hello world
[2] Subject: mysubject Received: 2024-06-30T16:20:25Z hello world
[3] Subject: mysubject Received: 2024-06-30T19:09:33Z How are you
[4] Subject: mysubject Received: 2024-06-30T19:13:26Z How are you
[5] Subject: mysubject Received: 2024-06-30T19:13:30Z Are you there
but fetchall(10) always return an empty array. Also note that I'm running it thru a php script. What is my mistake in setting up the consumer?
Objective i want to read items from jetstream and process them one by one using my handler.
Solution
Somewhere in my app i have this
and this is the detail of the subscriber function.
on NATS i have been able to publish messaages. this is what i get from nats-cli from mystream.mysubject
but fetchall(10) always return an empty array. Also note that I'm running it thru a php script. What is my mistake in setting up the consumer?