streamingfast / substreams-sink-kv

Substreams KV sink
Apache License 2.0
2 stars 5 forks source link

Define a new fork handling mode that can be real-time #25

Open maoueh opened 1 year ago

maoueh commented 1 year ago

Right now, we have a fork handling mode that write to the store with a delay of X blocks where the X is configurable by the user. This creates a latency delays that is unacceptable for some application.

This is a proposed fork handling method, inspired by what we do in sql-sink (stepd's amazing winks)

When in LIVE mode, we'll take all the keys that are to be written (with either SET or DELETE), and go fetch those keys. We'll pack them into a reverse operation from KVOperations. We will STORE that new UndoKVOperations bundle inside a single key in the key/value store within the same transaction, along with the cursor.

Upon receiving an undo signal, we'll go fetch all the rows up to the undo signal, get the corresponding Undo KVOperations, and apply them directly (without fetching previous keys this time), all within the same transaction. We'll also delete the undo: prefix key for each block that we reverted.

Each time we see a LIB pass over, we'll go remove the undo:-prefixed blocks

The current keyspace of the KV database behind a sink-kv is as such:

xc => last cursor
k[user defined keys] => [user-defined values]

When you receive NewIrreversible step within the BlockScopedData, you don't need to consider this new flow. Just apply the KVOperations bindly, and update the cursor (like it is currently done).

We could add, with reversed block_num or versed hex blocknum.

xu:ffffff123098 => UndoKVOperations-protobuf-message
xu:ffffff123097 => UndoKVOperations-protobuf-message
xu:ffffff123094 => UndoKVOperations-protobuf-message

Upon receiving a message to undo up to 123094, I would +1 to get the blocks above that were invalid.

When we receive New separate, we need to keep track of all blocks before the final_block_height. Clean-up any xu: key before block final_block_height, using the reverse block number in the Scan() call here.


(also please rename this field: https://github.com/streamingfast/substreams-sink-kv/blob/develop/proto/substreams/sink/kv/v1/kv.proto#L16)