basis-company / nats.php

nats jetstream client for php
126 stars 30 forks source link

[refactoring proposal] consumer api #65

Open withinboredom opened 7 months ago

withinboredom commented 7 months ago

When working on #62, I discovered there are some issues with implementing the consumer model as-is. Further, some "newer" features of NATs is not easily available (NAKs, ordered consumers, delayed messages, etc) so it would be nice to have them.

I took a deep look at the Go implementation to get an idea of what it might look like in PHP. Go tends to be written in such a way that things feel synchronous even though they aren't. So it has the best chance of being a compatible model for PHP (for both traditional PHP and async PHP via fibers).

Taking inspiration from that, there are a few low-level types that this depends on:

// send to NATs describing how we want to consume from the stream
readonly class PullRequest { public function __construct(
  public int? $expiresSeconds,
  public int? $batchCount,
  public int? $maxBytes,
  public bool $noWait,
  public int? $idleHeartbeat
) {} }

readonly class Metadata { public function __construct(
  public int $consumerSequence,
  public int $streamSequence,
  public int $numDelivered,
  public int $numPending,
  public DateTimeImmutable $timestamp,
  public string $streamName,
  public string $consumerName,
  public string $domainName,
) {} }

interface ConsumerMessage {
  public function getMetadata(): Metadata;
  public function getRawData(): string;
  public function getHeaders(): array;
  public function getSubject(): string;
  public function getReplySubject(): string;

  // acknowledge the message
  public function ack(): void;

  // ack the message and wait for ack reply from the server. Useful for scenarios where the message loss
  // is unacceptable, despite the performance impact.
  public function doubleAck(): void;

  // tell the server to redeliver the message
  public function nak(): void;

  // tell the server to redeliver the message after a delay
  public function nakWithDelay(float $millisecondsDelay): void;

  // tell the server the message is still being worked on. This resets the server redelivery timeout.
  public function inProgress(): void;

  // tell the server to never redeliver the message
  public function term(): void;

  // tell the server why the message shouldn't be delivered which will be emitted as a server advisory.
  public function termWithReason(string $reason): void;
}

interface MessagesContext extends \Iterator {
  // gets the next message from the stream via foreach or manually. Blocks until there is a message
  public function next(): ConsumerMessage;

  // unsubscribe from the stream and immediately stops iteration. Messages may still be in the 
  // inbox and will be discarded.
  public function stop(): void;

  // unsubscribe from the stream but any messages still in the buffers/inbox will continue to be 
  // consumed until they are gone.
  public function drain(): void;
}

interface ConsumerContext {
  // stops consumer and any pending messages will be discarded
  public function stop(): void;

  // stops the consumer but continues to consume pending messages
  public function drain(): void;
}

interface MessageBatch {
  // @return Generator<ConsumerMessage>
  public function getMessages(): \Generator;
}

Here's the interface inspired by the Go consumer:

interface Consumer {
  // use to receive up to a $batchCount of messages from a stream or $maxWait seconds pass,
  // whichever is sooner. Note that $idleHeartbeat is 5s by default (for $maxWaits longer than 10s, or
  // disabled for shorter waits) and if the client hasn't received a heartbeat in 2x $idleHeartbeat, then
  // an exception should be thrown. This method is non-blocking, but returns a Messagebatch that can 
  // be iterated on.
  public function fetch(int $batchCount, int $maxWait = 30, int $idleHeartbeat = 5): MessageBatch;

  // exactly the same as fetch(), but counts by bytes instead of the number of messages.
  public function fetchBytes(int $maxBytes, int $maxWait = 30, int $idleHeartbeat = 5): MessageBatch;

  // exactly like fetch(), but if there are no messages available in the stream, then the generator 
  // will return immediately, regardless of the number of messages requested.
  public function fetchNoWait(int $batchCount): MessageBatch;

  // continuously consumes from a stream using the provided consumer function. The callback can 
  // accept up to two arguments: fn(ConsumerMsg $message, ConsumerContext $context)
  public function consume(\Closure $consumer): ConsumerContext;

  // Allow continuously iterating over a stream.
  public function Messages(): MessagesContext;

  // receive the next message from the stream. Note that this is a blocking call.
  public function next(int $maxWait = 30, int $idleHeartbeat = 5): ConsumerMessage;

  // get the current consumer configuration from the stream
  public function info(): Configuration;
}

What are your thoughts?

nekufa commented 7 months ago

Hi @withinboredom i feel that current api is a bit outdated and we need to reinvent it. I'm not sure that client dependency is good - we need to pass client to any message so it can use connection to perform calls. Looks like active record pattern, and maybe it's okay, your solution looks good for me!

withinboredom commented 7 months ago

we need to pass client to any message so it can use connection to perform calls. Looks like active record pattern, and maybe it's okay

Yes, I think that is a good thing though. Where in the code the message is being processed might be quite far from a client (such as deep in a job worker) and keeping (n)ack'ing near where an error or success is actually determined should result in more maintainable code, vs. passing a message (or id) back up the call stack to be handled may be far more complex.

nekufa commented 7 months ago

Is it better to rethink whole api like channel publishing and subscription? I mean something like that

$client->getChannel('tester')->publish('hello world')
ro0NL commented 6 months ago

currenly im quite happy with the client, but i had some issues discovering features like ACK/NAK as well.

Also confusing is Stream::publish vs Client::publish + Stream::put vs Client::dispatch

we settled preferring the client usage directly

our flow is

SUB
NEXT
ACK/NACK
NEXT
ACK/NACK
...
UNSUB

So perhaps simply add Client::next/nextBatch/ack/nak and KISS :')

nekufa commented 6 months ago

@ro0NL @withinboredom please, check queue implementation in latest release. i'm happy with that, looks like good to me! also readme is updated (pub/sub and jetstream parts)

ro0NL commented 6 months ago

no BC breaks :+1:

the new API doesnt work for our case, since we fetch/ack/nack in an abstraction layer.

So we need to subscribe once, while keep reusing that subscription for subsequent fetches

Also only payload+replyTo leaves the abstraction layer, so we cant use $msg->ack() later, which is an active-record issue generally :)

nekufa commented 6 months ago

@ro0NL yep, the idea was to save api as much as possible) subscribe method returns queue instance that you can persist in a transport and call fetch to get message one by one. you can get queue instance for a consumer, there is an example in jetstream part of readme:

...
// consumer can be used via queue interface
$queue = $consumer->getQueue();
while ($message = $queue->next()) {
    if (rand(1, 10) % 2 == 0) {
        mail($message->payload, "See you later");
        $message->ack();
    } else {
        // not ack with 1 second timeout
        $message->nack(1);
    }
    // stop processing
    if (rand(1, 10) % 2 == 10) {
        // don't forget to unsubscribe
        $client->unsubscribe($queue);
        break;
    }
}

// use fetchAll method to batch process messages
// let's set batch size to 50
$queue = $goodbyer->setBatching(50)->create()->getQueue();

// fetching 100 messages provides 2 stream requests
// limit message fetching to 1 second
// it means no more that 100 messages would be fetched
$messages = $queue->setTimeout(1)->fetchAll(100);

$recipients = [];
foreach ($messages as $message) {
    $recipients[] = (string) $message->payload;
}

mail_to_all($recipients, "See you later");

// ack all messages
foreach ($messages as $message) {
    $message->ack();
}
ro0NL commented 6 months ago

it's fair API for standard usage, i agree :+1:

edit: actually i can reuse Queue:

$this->queue ??= $this->getConsumer()->getQueue();
$data = $this->queue->next();
ro0NL commented 6 months ago

@nekufa https://github.com/etrias-nl/php-toolkit/commit/eeb68a98ae2d332cfafc5fd4acfef6c62a82f768 :ok_hand:

on a side note, perhaps tag 1.0.0 soonish :angel: