streamingfast / substreams

Powerful Blockchain streaming data engine, based on StreamingFast Firehose technology.
Apache License 2.0
150 stars 44 forks source link

IPFS/Arweave/Filecoin/off-chain fetching in Substreams [DESIGNS] #284

Open abourget opened 10 months ago

abourget commented 10 months ago

Goal: bridge Substreams and off-chain data.

Requested by: Masterfile, Geo, Bitski, skullnbones, TopLedger, Helium, to name a few

Problem

We need to draft an example of what the integration points would look like, interfaces. What's the substreams.yaml specs, what are the processes that need to exist, how to run them, how a consumer would use this.

Here are two ideas of design that we have worked on in different contexts. Please comment.

Design A

Here's the latest idea of design done with stepd:

This design allows for a window of references (where references are an IPFS file, a Filecoin file, an Arweave file, a URL, etc..) , or a window of blocks (which represent time rather than a number of files to keep track of). This allows the resolver engine to be kept completely stateless, and the cursor is what holds the state between the consumer and the server.

WARN: we haven't thought through the reorgs/undo support yet in here.

So:

  - name: ipfs_refs
    kind: map
    inputs:
      - params: string
      - source: sf.substreams.v1.test.Block
    output:
      type: proto:sf.substreams.offchain.v1.References

  - name: ipfs_resolver
    kind: uri_resolver  // or `offchain` or `resolver` or `url_resolver` ?
    inputs:
      - map: ipfs_refs
    output:
      type: proto:sf.substreams.database.v1.EntityChanges

the sf.substreams.offchain.v1.References could look like:

message References {
  repeated Reference refs = 1;
}
message Reference {
  google.protobuf.any.Any meta = 1;
  int32 block_index = 2;
  reserved 3 to 4;
  oneof {
    IPFS ipfs = 5;
    Arweave arweave = 6;
    Filecoin filecoin = 7;
    URL url = 8;
  }

  message IPFS {
    string cid = 1;
  }
  message Arweave {
  }
  message URL {
    string url = 1;
  }
}

meta in there would be any metadata that can be used by the WASM code when the IPFS ref is resolved downstream, for example, keeping track of a primary where to write the output when we obtain the file and process it. This way, we can produce a n EntityChanges with some stuff we grasped from the transaction. This is a negotiation between the sf.susbtreams.offchain.v1.References and the resolver module code. Maybe it could be fixed and declared in the manifest, but we'll see. This would accomodate anything.

Process

When handling an incoming request, the resolver-sink would do:


struct Reference {
  UniversalID uint64  // block number * 1000000 + index in this block. This could be passed to the WASM code if we want to be able to use a universal reference as a primary key in the DB for instance. It's not as nice as a SeqNo but will do the job.
  Metadata []byte
  URL string
  Type URLType
  BlockNum uint64
  IndexInBlock uint64
}

struct  somewhere {
  unresolved []Reference
}

and from this we can compute the cursor, so it can be picked up and the Resolver can reload back in memory all the metadata, references, from that window (by querying like ~1000 blocks in the past, or ~10000 references in the from the block it refers to). This way, we don't need a database, because the mapper holds that data,, and the cursor transports the references.

The goal here is that the resolver doesn't need state.

NOTE: the UniversalID isn't as slick either as a globally resolvable url like: ipfs:///cid1, because the ID could be universal but full of duplicates (3 references to the same URL would hold 3 things), but maybe that's desired if you have transactional metadata to interprete the right parts of the reference. Sooo.. still needs thought.

Cursor

The cursor contains:

struct Cursor {
  SubstreamsCursor string
  UnresolvedRefs *roaring.Bitmap
  LowestBlockInBitmap uint64
  LowestSeqNoInLowestBlock uint64
}

Upon receiving the cursor, the server can recompute the SeqNo references, and resolve those in the roaring bitmap, to the exact results in the map.

The SeqNo is an incrementing number for all references starting from the first time you do a startBlock with no cursor. For a given cursor, the SeqNo would be incrementing all the time. (REVIEW behavior with undos and all)

WASM Execution

Once a reference is resolved, it is passed through wazero to execute a piece of code referenced in the resolver module, just like a mapper module, with the bytes, and a fixed manner (except th eoutput type). The rest will be output downstream by the resolver, in the same response pattern as the Substreams engine, so postgres-sink can see only fire (?!) :) won't see a difference.

IPFS feeding

The resolver turns around to query an ipfs node that can be configured to poll people around, fetch from pinata, etc.. it is an out of bound concern to connect to the different providers around, other indexers, etc.. for sharing IPFS data.

We need to explore to see if Arweave has a similar model of fetching through a bunch of nodes, doing discovery, etc.. and same for Filecoin. We can figure out if a sidecar is best for sharing caches, reaching out to those providers, or hitting URLs directly.


Design B

Here's a summary of some different requirements, which would yield a different implementation, stemming from the needs at Geo:

This method would allow IPFS data to be used within stores, mappers, using the same model as we currently have. It would be easier to roll out. It does change the nature of the Tier2 and Tier1 services on Substreams, as they now need to be plugged into some sort of resolver service. Perhaps, that resolver service is aware of banned content, and availability oracle blacklistings, etc.. so could answer with more determinism. However, if you've processed a part of the history, and that resolver service learns only later that you should remove some content, you'll start having divergences. Determinism seems quite hard to achieve in those conditions.

Of note too: Leo's feedback is it should be a callback based API (start the download, return a reference, then fetch the results with the reference). Otherwise, you're forced into linearly start a download, wait for it to complete, until you start the next. A similar pattern was used in the KV Sink (linear and blocking) and it made things synchronous and slow. A callback-based API would allow the runtime to parallelize downloads if there are many.

yanivtal commented 10 months ago

Hey thanks for posting this Alex.

First to clarify our use case. We have proposals that have data off chain that we anchor onchain. This way we can have onchain governance of offchain data. Each proposal has a set of actions. They're stored as triples but I'll just describe them as entity updates here for simplicity.

Let's say that we're building an app for organizing music.

Example

Proposal 1 Create Artist Radiohead Create Artist Monolink Update Radiohead to add album OK Cuter Update Monolink to add a DJ Set

Proposal 2 Update Radiohead to change album name to OK Computer Create Artist Pink Floyd

This way all the data lives offchain but the hashes for the proposals are onchain. We can see the hash for Proposal 1 and then the hash for Proposal 2 in sequence onchain. Order here matters because at the end of processing both proposals, we want to make sure that the name of Radiohead's album is OK Computer and not OK Cuter.

A few open questions:

YaroShkvorets commented 10 months ago

With Design A, what does it mean for the end sink? Does it have to consume two modules now: primary map module from substreams endpoint and a trailing resolver type module from the resolver-sink? I guess they can both live behind the same endpoint.

MercuricChloride commented 10 months ago

Super excited to play with this soon. I have a couple thoughts.

1. Conceptual thoughts on what the ideal flow for ipfs is

I think that the way graph-node handles this with file data sources is pretty smart. Where basically it just puts some restrictions on what these entities can do and treats them as two separate worlds. This to me feels similar to the idea of the resolver modules.

However I think that to make this feel really powerful, it would be really nice to have resolver modules be able to interact with other substreams modules that make sense (maps and store), but maybe just keep them in two different runtimes. The traditional substreams onchain data -> protobuf runtime. And the "dirty" resolver -> protobuf runtime.

2. Actual ipfs implementation thoughts

I wonder to what level we need to extend the substreams runtime to handle ipfs and network stuff within a substream? (This obviously depends on choosing either design A or B) But if we just have whatever non deterministic ipfs fetching return an enum with the appropriate data. we can leave it up to the substreams creator for what is acceptable error recovery?

An example would be maybe your IPFS data isn't integral to your substream, like image sources for an NFT lets say. Maybe in this case you can just try to unwrap() your data. And if its not there oh well, its not integral to what you are building.

Or in the case of it being a requirement for your data output, you can implement some more advanced error recovery such as retries.

An example of a clever error recovery scheme could be: if we don't find the contents of a CID, we could pop it into a store and basically just attempt to refetch the data later down the line depending on how it failed.