A rather large change which changes how a Rebaser server consumes
requests for change sets and when it performs related "dependent values
update" (aka "DVU") runs.
General NATS Jetstream Architecture
A Rebaser server uses 2 NATS Jetstream streams to track its work:
REBASER_REQUESTS: as before all requests are consumed from this
stream and can be thought of as a work queue. In this implementation,
the stream is a limits-based stream, as the Jetstream consumer model
is different. As before a request for workspace $wk_id and change
set $cs_id are published on rebaser.requests.$wk_id.$cs_id.
REBASER_TASKS: this is a classic Jetstream work queue stream, but
where there is only one message per subject and where each message
represents an exclusive task for a single Rebaser to spin up and run.
In this case, there are only "process" tasks in which a Rebaser will
consume the enqueued requests for a change set and run DVUs in serial
(that is, only one at a time and one after the other as needed). When
a request message is sent to the REBASER_REQUESTS stream, another
body-less message is sent to this tasks stream. These messages are
published on rebaser.tasks.$wk_id.$cs_id.process.
Tasks Stream
On the tasks stream, a single Jetstream consumer is setup to share work
accross multiple Rebasers as clients. When a Rebaser starts to process a
task message it continuously sends AckKind::Progress messages to keep
the message from being ack'd or redelivered. If the task encounters an
error, it can return a Result::Err(_) from its Naxum handler which
will trigger an AckKind::Nack(None) message, causing an immediate
redlivery of the message to another Rebaser.
If a task needs to be interrupted due to a graceful shutdown of a Rebaser
server, the handler it will return an Error::Interupted(_) error which
ensures that the message is nack'd and will be redelivered. If a task
can be cleanly completed, the the handler will return a Result::Ok(_)
which triggers an AckKind::Ack, causing the message to be deleted from
the tasks stream and so will not be ran again.
Requests Stream
When a "process" task is running, a dedicated NATS consumer is created
to exclusively process all requests for a change set in serial (that is,
one at a time). This consumer is known as an "ordered consumer" and is
push-based (rather than the default pull-based consumers). An ordered
consumer is much lighter weight and ephemeral as far as a NATS cluster
is concerned and thus should reduce the stress on NATS when many change
sets are created/active over a short period of time.
This ordered consumer is set up with a timeout that detects when no
message has entered the subject (or no message has been pulled into the
Naxum app) within a period. When this "quiescence" period is seen, this
triggers a specific graceful shutdown of the "process" task where its
exit state is to return Result::Ok(_) and ack the task message. In
this way, change sets which become inactive (that is, no Rebaser request
message) are spun down to conserve resources and allow the Rebaser to
focus only on "active" change sets.
Using an ordered consumer means that we can no longer use a message
ack to delete a processed messages (this is a trick of a work queue
stream). Therefore when a request has been successfully processed, the
message is deleted from the stream using its sequence number. A new
Naxum middleware called PostProcess provides us a way to handle this
delete on an OnSuccess callback. In the OnFailure case we simply
don't delete the message which means the next message is still the first
and only message to process.
Another tracked Tokio task is running alongside this consuming requests
task, called the SerialDvuTask. It is waiting for a
tokio::sync::Notify to fire which is triggered by the
request-consuming Tokio task. If, during the run of a DVU, another
request is processed that requires another DVU run, then the Notify
will be re-enabled. That way when the SerialDvuTask loop comes back to
check, the Notify will be set thus trigger "yet one more" DVU run.
Other Structural Changes
Among other changes, some of note are:
The publisher of a Rebaser request send it directly to the
REBASER_REQUESTS stream. Due to its current RPC (i.e.
request-then-await-response) communication pattern, setting up the
Rebaser in this way makes it operate more like Pinga and Veritech.
Future work is likely going to bring back the concept of "activities"
that flow for interested parties to follow, however prior to this
commit, the only users of the activity stream were callers to the
Rebaser and the Rebaser server itself.
A first class client for the Rebaser is provided in
lib/rebaser-client which abstracts the NATS communication and the
on-the-wire request formatting details, ideally allowing us to evolve
the Rebaser's messaging internally where possible.
The start of an API messaging framework is provided largely in the
lib/rebaser-core crate in the api_types module. Several Rust
traits are provided that when implemented gives the user a versioned
message that can be upgraded and understands its serialization format
independently of versioning. This should allow us to evolve the
Rebasers request and response messages with both the client and server
able to detect unsupported message versions without having to
deserialize the message payload itself. Big props to @jhelwig's work
on version graph snapshots for the inspiration.
Rebaser request and response messages are transmitted with several
metadata header in the NATS message including:
X-CONTENT-TYPE: describes the serialization format in a content
type/MIME header compatible string. Current values are
application/json and application/cbor with application/cbor
the current default.
X-MESSAGE-TYPE: describes the message type, typically the primary
Rust name. Current values are EnqueueUpdatesRequest and
EnqueueUpdatesResponse.
X-MESSAGE-VERSION: a positive integer, where a greater number
value implies a newer version. It is likely that code that would
understand version 5 of a message may be able to understand version
4, 3, 2, 1 but not 6 as an example.
NATS-MSG-ID: this is a standard NATS header which can be used for
message de-duplication and is populated with a random
at-request-time Ulid value, managed via a RequestId type
(required for all API data types).
A rather large change which changes how a Rebaser server consumes requests for change sets and when it performs related "dependent values update" (aka "DVU") runs.
General NATS Jetstream Architecture
A Rebaser server uses 2 NATS Jetstream streams to track its work:
REBASER_REQUESTS
: as before all requests are consumed from this stream and can be thought of as a work queue. In this implementation, the stream is a limits-based stream, as the Jetstream consumer model is different. As before a request for workspace$wk_id
and change set$cs_id
are published onrebaser.requests.$wk_id.$cs_id
.REBASER_TASKS
: this is a classic Jetstream work queue stream, but where there is only one message per subject and where each message represents an exclusive task for a single Rebaser to spin up and run. In this case, there are only "process" tasks in which a Rebaser will consume the enqueued requests for a change set and run DVUs in serial (that is, only one at a time and one after the other as needed). When a request message is sent to theREBASER_REQUESTS
stream, another body-less message is sent to this tasks stream. These messages are published onrebaser.tasks.$wk_id.$cs_id.process
.Tasks Stream
On the tasks stream, a single Jetstream consumer is setup to share work accross multiple Rebasers as clients. When a Rebaser starts to process a task message it continuously sends
AckKind::Progress
messages to keep the message from being ack'd or redelivered. If the task encounters an error, it can return aResult::Err(_)
from its Naxum handler which will trigger anAckKind::Nack(None)
message, causing an immediate redlivery of the message to another Rebaser.If a task needs to be interrupted due to a graceful shutdown of a Rebaser server, the handler it will return an
Error::Interupted(_)
error which ensures that the message is nack'd and will be redelivered. If a task can be cleanly completed, the the handler will return aResult::Ok(_)
which triggers anAckKind::Ack
, causing the message to be deleted from the tasks stream and so will not be ran again.Requests Stream
When a "process" task is running, a dedicated NATS consumer is created to exclusively process all requests for a change set in serial (that is, one at a time). This consumer is known as an "ordered consumer" and is push-based (rather than the default pull-based consumers). An ordered consumer is much lighter weight and ephemeral as far as a NATS cluster is concerned and thus should reduce the stress on NATS when many change sets are created/active over a short period of time.
This ordered consumer is set up with a timeout that detects when no message has entered the subject (or no message has been pulled into the Naxum app) within a period. When this "quiescence" period is seen, this triggers a specific graceful shutdown of the "process" task where its exit state is to return
Result::Ok(_)
and ack the task message. In this way, change sets which become inactive (that is, no Rebaser request message) are spun down to conserve resources and allow the Rebaser to focus only on "active" change sets.Using an ordered consumer means that we can no longer use a message
ack
to delete a processed messages (this is a trick of a work queue stream). Therefore when a request has been successfully processed, the message is deleted from the stream using its sequence number. A new Naxum middleware calledPostProcess
provides us a way to handle this delete on anOnSuccess
callback. In theOnFailure
case we simply don't delete the message which means the next message is still the first and only message to process.Another tracked Tokio task is running alongside this consuming requests task, called the
SerialDvuTask
. It is waiting for atokio::sync::Notify
to fire which is triggered by the request-consuming Tokio task. If, during the run of a DVU, another request is processed that requires another DVU run, then theNotify
will be re-enabled. That way when theSerialDvuTask
loop comes back to check, theNotify
will be set thus trigger "yet one more" DVU run.Other Structural Changes
Among other changes, some of note are:
REBASER_REQUESTS
stream. Due to its current RPC (i.e. request-then-await-response) communication pattern, setting up the Rebaser in this way makes it operate more like Pinga and Veritech. Future work is likely going to bring back the concept of "activities" that flow for interested parties to follow, however prior to this commit, the only users of the activity stream were callers to the Rebaser and the Rebaser server itself.lib/rebaser-client
which abstracts the NATS communication and the on-the-wire request formatting details, ideally allowing us to evolve the Rebaser's messaging internally where possible.lib/rebaser-core
crate in theapi_types
module. Several Rust traits are provided that when implemented gives the user a versioned message that can be upgraded and understands its serialization format independently of versioning. This should allow us to evolve the Rebasers request and response messages with both the client and server able to detect unsupported message versions without having to deserialize the message payload itself. Big props to @jhelwig's work on version graph snapshots for the inspiration.X-CONTENT-TYPE
: describes the serialization format in a content type/MIME header compatible string. Current values areapplication/json
andapplication/cbor
withapplication/cbor
the current default.X-MESSAGE-TYPE
: describes the message type, typically the primary Rust name. Current values areEnqueueUpdatesRequest
andEnqueueUpdatesResponse
.X-MESSAGE-VERSION
: a positive integer, where a greater number value implies a newer version. It is likely that code that would understand version 5 of a message may be able to understand version 4, 3, 2, 1 but not 6 as an example.NATS-MSG-ID
: this is a standard NATS header which can be used for message de-duplication and is populated with a random at-request-timeUlid
value, managed via aRequestId
type (required for all API data types).