We can't rely on current design given past assumptions on how an aggregate size was calculated was not correct. We need to compute commP of commPs to check it out (and ordered by CAR size)
1. Priority queue for cargo with dynamoDB state machine based on indexes
There are two tables within the DB Stack acting as a Priority Queue:
Cargo - keeps track of cargo (CAR Files ingested by web3.storage) state
Ferry - keeps track of aggregates state
Schema
type Cargo = {
// CAR file CID
link Link
// Filecoin Piece CID - commP of CAR file
pieceLink Link
// Filecoin Piece Size
pieceSize number
// State of the cargo in the pipeline
stat CARGO_STAT
// Priority in the queue - for now likely same as queuedAt
priority string
// Timestamp
queuedAt string
// TODO: Maybe timestamps for other stats?
// Filecoin Aggregate CID - commP of commPs
aggregateLink? Link
// Failed to add into aggregate code
aggregateFailedCode? string
// INDEXES
// primaryIndex: { partitionKey: link },
// globalIndexes: {
// indexStat: {
// partitionKey: 'stat',
// sortKey: 'priority',
// projection: 'all' // TODO: see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Projection.html
// },
// indexAggregate: {
// partitionKey: 'aggregateLink',
// sortKey: 'size',
// projection: 'keys_only' // TODO: see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Projection.html
// }
//. // TODO: Maybe we need an index to query by pieceLink
// }
}
type Ferry = {
// Filecoin Aggregate CID - commP of commPs
link Link
// Aggregate size in bytes - TODO: maybe nice to have for metrics
size number
// State of the ferry in the pipeline
stat FERRY_STAT
// Priority in the queue - for now likely same as queuedAt
priority string
// Timestamp
queuedAt string
// INDEXES
// primaryIndex: { partitionKey: link },
// globalIndexes: {
// indexStat: {
// partitionKey: 'stat',
// sortKey: 'priority',
// projection: 'keys_only' // TODO: see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Projection.html
// }
// }
}
// CID
type Link = string
// State of Cargo state machine
type CARGO_STAT = 'QUEUED' | 'OFFERING' | 'SUCCEED' | 'FAILED'
// State of Ferry state machine:
type FERRY_STAT = 'QUEUED' | 'ARRANGING' | 'SUCCEED' | 'FAILED'
State Machine
Cargo State Machine might have the following state changes:
* `QUEUED` -> `OFFERING` - when cargo item is associated with an aggregate to offer for storage
* `OFFERING` -> `SUCCEED` - end state as cargo is already available in Storage Providers
* `OFFERING` -> `FAILED` - cargo could not make it to Storage Provider because this specific cargo failed (e.g. wrong commP, or could not be fetched)
* `OFFERING` -> `QUEUED` - cargo could not make it to Storage Provider because other cargo in same aggregate failed, but there is no issue with this specific cargo reported. Therefore, it can be queued for other Aggregate inclusion
* `FAILED` -> `SUCCEED` - cargo previously failed but reason behind it is now solved
Ferry State Machine might have the following state changes:
* `QUEUED` -> `ARRANGING` - when given ferry was included in an `aggregate/offer` invocation to Storage Broker
* `ARRANGING` -> `SUCCEED` - when `aggregate/offer` for ferry succeeded
* `ARRANGING` -> `FAILED` - when `aggregate/offer` for ferry failed
Flow
CAR Files get inserted into cargo Table once R2 write AND commP write events happen (Consumer stack context)
cargo table stream consumer attemptFerryLoad lambda is triggered once 100 Inserts OR 15 minutes pass (or maybe a CRON Job?). Lambda performs:
queries DB for a page of stat QUEUED via index indexStat
sorts page results by their size and attempts to create an aggregate with a compatible size with the results. In case size is not enough, it attempts to get more pages until either having enough cargo or stopping until next call.
performs a DB transaction write items updating stat to OFFERING and sets aggregateLink AND write entry to ferry Table with the aggregate information (with constraints to guarantee previous state are the same). TODO: figure out batching transaction size limitations
ferry table stream consumer invokeAggregateOffer lambda is triggered once INSERT operation happens in table
Invokes aggregate/offer
Mutates stat to ARRANGING (in case of failure it will be retried, but is fine given first operation is idempotent)
CRON keeps triggering Lambda function to check for Receipts for ferries with stat ARRANGING
Once receipt is available, stat is mutated to either SUCCEED or FAILED. In case FAILED, cargo should also have aggregateFailedCode updated.
What are we missing?
Flow 2.3 might be problematic given TransactWriteItems has a limit of 100 items
This is a concurrency challenge, given multiple concurrent lambdas can try to use same cargo and both use parts of the same batch and write in different orders, leading into both Aggregates not completely written (considering we would be required to call TransactWriteItems multiple times
A workaround for this is to introduce a Pessimistic locking approach
Create a lock table where each row is a lock. Primary key is the lock key and should be computed based on the key to lock (e.g. ${tableName}#${primaryKey}
Rows added in the lock table should also auto delete after a certain TTL to guarantee that if actor party does not unlock the entry (it can crash) ti will be back available. Heart beat might be needed to put it back locked over time
Before starting a transaction, locks must be acquired by Putting record in Lock Table with Conditional Check that fails in case row exists.
It won’t be needed to get locks out as they will expire
A drawback is that a partial failure continues to be problematic.
We would need to validate aggregate commPs in flow 3 to make sure all made it. In case they do not make it, a fault tolerance mechanism is required to put it back
Alternatively, we can make this flow run in a CRON job with strong guarantee that it will only run once. In other words, Flow 2 should be a CRON instead
We can scale by sharding DB in the future as noted in Notes below
A partial failure continues to be problematic if we trigger multiple TransactionWriteItems in the CRON Job and we need similar solution as stated above…
Maybe primary key can actually be pieceLink
this could make us more close to spade naming, and still enable us expose API here to request URLs for CARs based on commP as the mapping
Notes
We need a trigger to check we have enough for an aggregate. In current w3filecoin implementation we used to do this relying on DynamoDB Streams on insert event, however here it might trigger too many lambdas with NOPs
perhaps we can consider if too many calls to rely on a CRON job that could trigger each X minutes, perform a paginated query to cargo-table and tries to create an aggregate as it goes. Once it creates an aggregate, stat is updated as a batch update to all the CARs that we want now to be part of an aggregate
Solution with time based also has drawbacks:
We might have a backog that increases over time as the write throughput in w3s increases
15 minutes cron could be good, given a lambda can run for 15 minutes. Therefore, while more aggregates can be created the cron job can keep
We have no parallel processing with multiple lambdas at the same time. Perhaps this is not a problem for now, and once it is an issue we can look into patterns like dynamoDB sharding to split the load
queuedAt as sortKey is good for us to avoid Starvation where we try to add things as we get them. However, we also want to sort items per size. We need to see how it look like, but perhaps we can do a Query with max page size of items with statAGGREGATED and sort all of them by size and try to create an aggregate sorted. It is a balanced solution that we can still sort, but might still leave the smaller sizes pending. Further optimizations can come in, where we have a list and then try to put small things in the end (from this page). In case page is not enough to create an aggregate, we can get next page and try with more until enough.
2. Priority Queue based on a table per State
There are a few tables within the DB Stack acting as a State Machine Queue:
CargoQueued - keeps track of queued Cargo
CargoOffered - keeps track of offered Cargo
CargoProcessed - keeps track of offered Cargo
FerryQueued - keeps track of ferries offered
FerryArranged - keeps track of ferries arranged
Schema
interface CargoQueued {
// CAR file CID
link Link
// Filecoin Piece CID - commP of CAR file
pieceLink Link
// Filecoin Piece Size
pieceSize number
// Priority in the queue - for now likely same as insertedAt
priority string
// Timestamp
insertedAt string
// INDEXES
// primaryIndex: { partitionKey: link, sortKey: priority },
// indexPiece: {
// partitionKey: 'pieceLink',
// sortKey: 'priority',
// projection: 'all' // TODO: see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Projection.html
// }
// }
// Note that there are no strong guarantees of unique CAR so that we
// can add failed Cargo again
}
interface CargoOffered extends CargoQueued {
// Filecoin Aggregate CID - commP of commPs
aggregateLink Link
// INDEXES
// globalIndexes: {
// indexAggregate: {
// partitionKey: 'aggregateLink',
// sortKey: 'size',
// projection: 'keys_only' // TODO: see https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Projection.html
// }
// }
// Note that there are no strong guarantees of unique CAR so that we
// can add failed Cargo again
}
interface CargoProcessed extends CargoOffered {
// Failed to add into aggregate code
failedCode? string
}
interface FerryQueued {
// Filecoin Aggregate CID - commP of commPs
link Link
// Aggregate size in bytes - TODO: maybe nice to have for metrics
size number
// Priority in the queue - for now likely same as queuedAt
priority string
// Timestamp
queuedAt string
// INDEXES
// primaryIndex: { partitionKey: link },
}
interface FerryArranged extends FerryQueued {
// Ferry was stored in SP
succeed boolean
}
Flow
TODO
Notes
advantages
enables tables to be insert only, and consequently enables easy migration to a different DB in the future (Example: Pail) given there won’t be any mutations in rows
disadvantages
Extra cost now to persist all the DBs and more DB operations
open challenges - DynamoDB makes it difficult to operate in this way
How to identify what is already in following state?
When we look for cargo in QUEUED_TO_AGGREGATE table, we need to make sure it is not yet in READY_TO_AGGREGATE .
Extra Queries per each item, which introduces a lot of complexity to avoid concurrency issues on updates
A cursor state and logic would be required so that we know where we are.
In fact, this creates same concurrency issue as above with concurrency, where reading and writing cursor
Moreover, when we read a page of items from the table, we will need to sort them by size to create the aggregate. Skipped items will mean that cursor order (from date for instance) might not be enough .
How to handle retries on aggregate failures?
a new row would need to be added to QUEUED_TO_AGGREGATE table, which means we could not rely on the CAR file link as the primary key. Consequently, we would loose the guarantees of not having same CAR multiple times present in different aggregates.
DynamoDB allows Transactions to Write/Update Tables in batch, but it is not possible to Read and Write at the same time
Also prone to exact same transaction limitations as stated above
3. Priority Queue based on a table per State with Reads from Aggregated Queue from 1
Notes
Solves some of the challenges from 2 on getting to know the state of each item, but creates even more need for Locks
DynamoDB Streams would be triggered per write, and therefore too many lambda executions do not make this cheap to operate at scale
4. Maybe SQL is our best bet…
Schema
CREATE TABLE cargo
(
-- CAR file CID
link TEXT PRIMARY KEY, -- perhaps pieceLink should be the primary key
-- Filecoin Piece CID - commP of CAR file
pieceLink TEXT NOT NULL,
-- Filecoin Piece Size
pieceSize number NOT NULL,
-- State of the cargo in the pipeline
stat CARGO_STAT NOT NULL,
-- Priority in the queue - for now likely same as queuedAt
priority TEXT NOT NULL,
-- Timestamp
queuedAt TIMESTAMP WITH TIME ZONE DEFAULT timezone('utc'::text, now()) NOT NULL,
-- TODO: Maybe timestamps for other stats?
-- Filecoin Aggregate CID - commP of commPs
aggregateLink TEXT REFERENCES ferry(link),
-- Failed to add into aggregate code
aggregateFailedCode TEXT,
);
CREATE INDEX cargo_stat_idx ON cargo (stat);
CREATE TABLE ferry
(
-- Filecoin Aggregate CID - commP of commPs
link TEXT PRIMARY KEY,
-- Aggregate size in bytes - TODO: maybe nice to have for metrics
size number NOT NULL,
-- State of the ferry in the pipeline
stat FERRY_STAT NOT NULL,
-- Priority in the queue - for now likely same as queuedAt
priority TEXT NOT NULL,
-- Timestamp
queuedAt TIMESTAMP WITH TIME ZONE DEFAULT timezone('utc'::text, now()) NOT NULL,
);
-- State of Cargo state machine
CREATE TYPE CARGO_STAT AS ENUM
(
'QUEUED',
'OFFERING',
'SUCCEED',
'FAILED'
);
-- State of Ferry state machine:
CREATE TYPE FERRY_STAT AS ENUM
(
'QUEUED',
'ARRANGING',
'SUCCEED',
'FAILED'
);
State Machine
Basically same as suggestion 1
Flow
CAR Files get inserted into cargo Table once R2 write AND commP write events happen (Consumer stack context)
CRON JOB triggers lambda function over time. Lambda performs:
queries cargo table for a page of stat QUEUED
sorts page results by their size and attempts to create an aggregate with a compatible size with the results. In case size is not enough, it attempts to get more pages until either having enough cargo or stopping until next call.
performs a DB transaction updating stat to OFFERING and setting aggregateLink AND insert entry to ferry Table with the aggregate information (it is required to guarantee previous state are the same and no concurrent job added something to other aggregate in the meantime)
CRON JOB triggers lambda function over time. Lambda performs:
queries ferry table for an entry of stat QUEUED
invokes aggregate/offer to spade-proxy (Must be idempotent!!)
mutates stat to ARRANGING in case of partial failure in second write (first was offer invocation),
CRON keeps triggering Lambda function to check for Receipts for ferries with stat ARRANGING
Once receipt is available, stat is mutated to either SUCCEED or FAILED. In case FAILED, cargo should also have aggregateFailedCode updated.
Notes
main drawback is using SQL vs DynamoDB which we use everywhere else
We do not have DynamoDB Streams that fit nicely for things like Flow 3
5. Maybe SQL is our best bet with Table per State
It is possible to do idea 2 quite easily due to the extra query capabilities that SQL has. There are still a few drawbacks worth flagging
Extra cost now to persist all the DBs and more DB operations
How to handle retries on aggregate failures?
same issue with same primary key or entry UNIQUENESS
Conclusions
Based on all the above ideas and their drawbacks we need to take a decision.
Being able to not be tied to Dynamo and being able to change easily can be good for the future. But what if we are looking at the wrong way of being able to replicate state?
We have UCAN Services and UCAN Log Stream with 1 year persistence (that we can write to a DB ordered if we want as logs)
With the intended receipts, we should be able to populate automatically a new DB as easily as with Table per State
It is also not clear if it would be a requirement to recreate a DB with all the State machine changes. Currently, state machine is not more than an internal enabler to aggregate cargo into a ferry. The queryable information from outside world is not impacted by it. Would we want more granular receipts in the future?
Particularly the write only Table option would be great, but added complexity to not have out of the box guarantee of unique item in cargo table is a bigger drawback.
Option 1 would work fine if we make sure to re-run commP of commPs on Flow 3 before submitting offer. This would require the invalidation of bad state that was partially written in the CRON if for some reason only part of the TransactionsWrite batches made it.
Option 4 requires more time based operations (CRON Jobs) and SQL usage, but looks like the option with less gotchas.
Reading https://dynobase.dev/dynamodb-vs-aurora/ it looks like cost wise should not be much different, and therefore probably relying on Aurora is our best option.
1. Priority queue for cargo with dynamoDB state machine based on indexes
There are two tables within the DB Stack acting as a Priority Queue:
Schema
State Machine
Flow
cargo
Table onceR2 write
ANDcommP write
events happen (Consumer stack context)cargo
table stream consumerattemptFerryLoad
lambda is triggered once 100 Inserts OR 15 minutes pass (or maybe a CRON Job?). Lambda performs:QUEUED
via indexindexStat
stat
toOFFERING
and setsaggregateLink
AND write entry toferry
Table with theaggregate
information (with constraints to guarantee previous state are the same). TODO: figure out batching transaction size limitationsferry
table stream consumerinvokeAggregateOffer
lambda is triggered onceINSERT
operation happens in tableaggregate/offer
stat
toARRANGING
(in case of failure it will be retried, but is fine given first operation is idempotent)Receipts
for ferries with statARRANGING
stat
is mutated to eitherSUCCEED
orFAILED
. In caseFAILED
,cargo
should also haveaggregateFailedCode
updated.What are we missing?
TransactWriteItems
has a limit of 100 itemsTransactWriteItems
multiple times${tableName}#${primaryKey}
Conditional Check
that fails in case row exists.TransactionWriteItems
in the CRON Job and we need similar solution as stated above…pieceLink
commP
as the mappingNotes
cargo-table
and tries to create an aggregate as it goes. Once it creates an aggregate,stat
is updated as a batch update to all the CARs that we want now to be part of an aggregatequeuedAt
as sortKey is good for us to avoid Starvation where we try to add things as we get them. However, we also want to sort items per size. We need to see how it look like, but perhaps we can do a Query with max page size of items withstat
AGGREGATED
and sort all of them by size and try to create an aggregate sorted. It is a balanced solution that we can still sort, but might still leave the smaller sizes pending. Further optimizations can come in, where we have a list and then try to put small things in the end (from this page). In case page is not enough to create an aggregate, we can get next page and try with more until enough.2. Priority Queue based on a table per State
There are a few tables within the DB Stack acting as a State Machine Queue:
Schema
Flow
TODO
Notes
QUEUED_TO_AGGREGATE
table, we need to make sure it is not yet inREADY_TO_AGGREGATE
.QUEUED_TO_AGGREGATE
table, which means we could not rely on the CAR filelink
as the primary key. Consequently, we would loose the guarantees of not having same CAR multiple times present in different aggregates.3. Priority Queue based on a table per State with Reads from Aggregated Queue from 1
Notes
4. Maybe SQL is our best bet…
Schema
State Machine
Basically same as suggestion 1
Flow
cargo
Table onceR2 write
ANDcommP write
events happen (Consumer stack context)cargo
table for a page of statQUEUED
stat
toOFFERING
and settingaggregateLink
AND insert entry toferry
Table with theaggregate
information (it is required to guarantee previous state are the same and no concurrent job added something to other aggregate in the meantime)ferry
table for an entry of statQUEUED
aggregate/offer
to spade-proxy (Must be idempotent!!)ARRANGING
in case of partial failure in second write (first was offer invocation),Receipts
for ferries with statARRANGING
stat
is mutated to eitherSUCCEED
orFAILED
. In caseFAILED
,cargo
should also haveaggregateFailedCode
updated.Notes
5. Maybe SQL is our best bet with Table per State
It is possible to do idea 2 quite easily due to the extra query capabilities that SQL has. There are still a few drawbacks worth flagging
Conclusions
Based on all the above ideas and their drawbacks we need to take a decision.
Being able to not be tied to Dynamo and being able to change easily can be good for the future. But what if we are looking at the wrong way of being able to replicate state?
Particularly the write only Table option would be great, but added complexity to not have out of the box guarantee of unique item in cargo table is a bigger drawback.
Option 1 would work fine if we make sure to re-run
commP of commPs
on Flow 3 before submitting offer. This would require the invalidation of bad state that was partially written in the CRON if for some reason only part of the TransactionsWrite batches made it.Option 4 requires more time based operations (CRON Jobs) and SQL usage, but looks like the option with less gotchas.
Reading https://dynobase.dev/dynamodb-vs-aurora/ it looks like cost wise should not be much different, and therefore probably relying on Aurora is our best option.