Closed andybritz closed 5 years ago
Some thoughts on sagas below for discussion...
A saga is long-lived business transaction consting of several smaller actions or transactions that are executed independently. A saga manager is a process that aims to coordinate such a set of transactions and provide failure handling mechanisms.
A saga can be represented as a directed acyclical graph of individual actions. The vertices of the graph represent these transactions, and the edges represent the dependencies between them.
The Saga knows nothing about the implementation details of the action. From the sagas point of view,
each action consists conceptually of a pair (startAction: Command, undoAction: Option[Command])
.
It may additionally contain configuration or rules for things like retries and timeouts.
Some requirements for the actions:
undoAction
:
SagaActionFinished
event.
This event includes the action Id and the outcome of the action, keyed by the saga Id. Sagas are managed by a saga manager (also known as saga coordinator or process manager). In our implementation, this will be a Kafka (probably KStream) application.
We refer to the process that listens for commands and executes them as action processors. From the saga manager's point of view, these action processors are black boxes. All communication between the Saga manager and the action processors is via messaging.
In our Kafka based implementation, this is done by:
startAction
command (to the command topic), as well as a SagaActionStarted
event.SagaActionFinished
topic and waiting for action to finish.The SagaActionFinished
is a generic event type, and is shared for all topics.
A saga is started by creating an instance of the saga with it's own unique identifier, the Saga ID.
Each saga has an event log associated with it (as a shared log across all sagas, but keyed by the saga ID). Every interaction is logged in an event log:
These log entries are events, and are stored indefinitely in a saga interaction topic.
From this log of interactions, the current state of the saga at any point in time can be derived. As a saga has a bounded lifespan its state can easily be derived on the fly.
The saga has a handler associated with it that:
The dependency graph defines the order in which actions are executed. An action can be started as soon as completion events have been received for all the actions it depends on.
When all actions in the graph are complete, the saga is completed successfully, and a saga complete event is emitted. The client application that launched the saga can listen for this event.
If any of the actions fail, either by receiving a failure event, timing out, or exhausting retries, the saga goes into undo mode. It then works through the dependency graph in reverse, sending commands to execute the compensating actions.
Once all the compensating actions have been completed the saga is complete, but failed. An event is emitted to his effect.
If any of the compensating actions fail, it still attempts to execute the remaining undo actions. In addition, a command is issued to request further investigation / send an email / log an issue in an issue tracker. This is an unexpected error, and will need escalation outside the scope of the saga manager itself.
The actor model is a useful abstraction for understanding and implementing sagas:
"Your actor framework is a process manager framework" - Greg Young
Because sagas are initiated with a command and emit an event when complete, just like any action they control, they can be composed into sagas of sagas without adding additional complexity.
So it makes sense to keep the sagas as simple as possible and compose where needed.
Any action can be controlled by a Saga - it just needs to wrapped so that it is started by a command message, and emits a message when it terminates.
This enables the saga manager to control processes that have interactions with external systems such as calling an endpoint on a web service.
This web service call must be idempotent, to handle the following scenario:
In this case the process manager will need to resubmit the command. This will now be a new command, and it will result in the webservice being called again. This is where idempotence is required - the second call tto the web service should be a no-op.
Example data flows below:
FundsReserved
or FundsReserveFailed
BidAccepted
or BidRejected
As these actions are saga aware, they also emit SagaActionFinished
events.
Saga definition
{
"name": "ReserveFunds",
"parameters": {
"sagaId": "d03c9cba-e0b1-4acf-93dd-f903e6857d90",
"actions": [
{
"name": "ReserveFunds",
"actionId": "f2ada97b-90a3-4541-9c5f-c6782381c91c",
"dependsOn": [],
"command": {
"name": "ReserveFunds",
"parameters": {
"reservationId": "4a53bf63-3ae9-483f-984f-013dcf327225",
"accountId": "bddf81a2-23bb-4cad-979f-cb9f68e3e62a",
"amount": 3000
},
"properties": {
"timeout": "30 seconds",
"retries": "retry config here..."
}
},
"undoAction": {
"actionId": "bc0b6da1-a192-455a-8c44-2e7bf08405ff",
"name": "UndoReserveFunds",
"command": {
"name": "UndoReserveFunds",
"parameters": {
"reservationId": "4a53bf63-3ae9-483f-984f-013dcf327225",
"accountId": "bddf81a2-23bb-4cad-979f-cb9f68e3e62a"
}
}
}
},
{
"name": "PlaceBid",
"actionId": "c198f2de-ff42-4a5c-a79f-a4bef177ffb9",
"dependsOn": ["f2ada97b-90a3-4541-9c5f-c6782381c91c"],
"command": {
"name": "PlaceBid",
"parameters": {
"bidId": "4a53bf63-3ae9-483f-984f-013dcf327225",
"accountId": "bddf81a2-23bb-4cad-979f-cb9f68e3e62a",
"auctionId": "e92d244a-96d3-458d-a29e-0900c9632cee",
"amount": 1500
},
"properties": {
"timeout": "30 seconds"
}
}
}
]
}
}
Note that the saga definition as the same shape as a command (see examples below). It is exaclty that. In can itself be included as part of a bigger saga.
ReserveFunds Command
Key: "bddf81a2-23bb-4cad-979f-cb9f68e3e62a"
(account ID)
{
"name": "ReserveFunds",
"parameters": {
"reservationId": "4a53bf63-3ae9-483f-984f-013dcf327225",
"accountId": "bddf81a2-23bb-4cad-979f-cb9f68e3e62a",
"amount": 3000
},
"saga": {
"sagaId": "d03c9cba-e0b1-4acf-93dd-f903e6857d90",
"actionId": "f2ada97b-90a3-4541-9c5f-c6782381c91c"
}
}
Note the inclusion of saga metadata.
PlaceBid Command
Key: "e92d244a-96d3-458d-a29e-0900c9632cee"
(auction ID)
{
"name": "PlaceBid",
"parameters": {
"bidId": "4a53bf63-3ae9-483f-984f-013dcf327225",
"accountId": "bddf81a2-23bb-4cad-979f-cb9f68e3e62a",
"auctionId": "e92d244a-96d3-458d-a29e-0900c9632cee",
"amount": 1500
},
"saga": {
"sagaId": "d03c9cba-e0b1-4acf-93dd-f903e6857d90",
"actionId": "c198f2de-ff42-4a5c-a79f-a4bef177ffb9"
}
}
UndoReserveFunds Command
Key: "bddf81a2-23bb-4cad-979f-cb9f68e3e62a"
(account ID)
{
"name": "UndoReserveFunds",
"parameters": {
"reservationId": "4a53bf63-3ae9-483f-984f-013dcf327225",
"accountId": "bddf81a2-23bb-4cad-979f-cb9f68e3e62a"
},
"saga": {
"sagaId": "d03c9cba-e0b1-4acf-93dd-f903e6857d90",
"actionId": "bc0b6da1-a192-455a-8c44-2e7bf08405ff",
"undo": true
}
}
Result events
In the scenario where
ReserveAccount
succeedsPlaceBid
failsUndoReserveAccount
is executed, and succeedsKey: "d03c9cba-e0b1-4acf-93dd-f903e6857d90"
(saga ID)
ReserveAccount
succeeds
{
"name": "SagaActionFinished",
"parameters": {
"sagaId": "d03c9cba-e0b1-4acf-93dd-f903e6857d90",
"actionId": "f2ada97b-90a3-4541-9c5f-c6782381c91c",
"result": "Success"
}
}
PlaceBid
fails
{
"name": "SagaActionFinished",
"parameters": {
"sagaId": "d03c9cba-e0b1-4acf-93dd-f903e6857d90",
"actionId": "c198f2de-ff42-4a5c-a79f-a4bef177ffb9",
"result": "Failure"
}
}
UndoReserveAccount
succeeds
{
"name": "SagaActionFinished",
"parameters": {
"sagaId": "d03c9cba-e0b1-4acf-93dd-f903e6857d90",
"actionId": "bc0b6da1-a192-455a-8c44-2e7bf08405ff",
"result": "Success",
"undo": true
}
}
Saga finished event
This event is emitted to notify that the entire saga is complete.
{
"name": "SagaFinished",
"parameters": {
"sagaId": "d03c9cba-e0b1-4acf-93dd-f903e6857d90",
"result": "Failure"
}
}
Saga state
Initial state
{
"sagaId": "d03c9cba-e0b1-4acf-93dd-f903e6857d90",
"actions": []
}
After starting the reserve funds, but before its completion
{
"sagaId": "d03c9cba-e0b1-4acf-93dd-f903e6857d90",
"actions": [
{
"id": "f2ada97b-90a3-4541-9c5f-c6782381c91c",
"result": "Pending"
}
]
}
After reserve funds completes
{
"sagaId": "d03c9cba-e0b1-4acf-93dd-f903e6857d90",
"actions": [
{
"id": "f2ada97b-90a3-4541-9c5f-c6782381c91c",
"result": "Success"
}
]
}
After starting place bid, but before completion
{
"sagaId": "d03c9cba-e0b1-4acf-93dd-f903e6857d90",
"actions": [
{
"id": "f2ada97b-90a3-4541-9c5f-c6782381c91c",
"result": "Success"
},
{
"id": "c198f2de-ff42-4a5c-a79f-a4bef177ffb9",
"result": "Pending"
}
]
}
Final saga state
{
"sagaId": "d03c9cba-e0b1-4acf-93dd-f903e6857d90",
"actions": [
{
"id": "f2ada97b-90a3-4541-9c5f-c6782381c91c",
"result": "Success"
},
{
"id": "c198f2de-ff42-4a5c-a79f-a4bef177ffb9",
"result": "Failure"
},
{
"id": "bc0b6da1-a192-455a-8c44-2e7bf08405ff",
"result": "Success"
}
]
}
Notes:
sagaId
- this is needed to localise all events concerned with the execution of a saga onto the same partition in the saga event topic.actionId
- each action within a saga has a unique ID. This is used to define the saga action stateSome more thoughts on Sagas / processes implementation: Two approaches:
We can kind of do this already quite easily for small ad-hoc use cases.
AggregateSet
with multiple aggregates.CommandAPI
for each of the aggregates we are interested in.publishAndQueryCommand
and then just flatmap in the next publishAndQueryCommand
.
FutureResult
and then maybe use the writer monad pattern to record the queries so far.
But we can also make it work in Java.Downsides:
In terms of B. Pure messaging, is there a way we can pull out the command API from the Handler, it would be nice if the ProcessController used the same API, but it does not need to be coupled with a CommandHandler and EventHandler.
This way the API is the same just the access to the implementation for the API differs?
Yes I hope this is possible. The CommandRequest
has some specifics that may not be directly applicable to sagas (such as Sequence
). Maybe there is an an appropriate interpretation in the saga context.
Experimental project added: https://github.com/simplesourcing/simplesagas
Closing this issue, now that we have the saga project. We can continue our discussion in that space.
The addition of a SAGA API to help with long running transactions. We should aim to solve the basic case as the full spectrum of SAGA complexity is something that would be difficult to cater for.