graphprotocol / graph-node

Graph Node indexes data from blockchains such as Ethereum and serves it over GraphQL
https://thegraph.com
Apache License 2.0
2.89k stars 962 forks source link

Block stream pipelining #2581

Closed leoyvens closed 2 years ago

leoyvens commented 3 years ago

The subgraph processing loop is currently entirely sequential. Pipelining is a way to parallelize it. The first target for this is the block stream, so that while a block is processed the next blocks are being scanned for triggers. This could reduce indexing time by up to 50%, though for must subgraphs that number is more like 20%. A few points to consider:

The block stream must be made independent from the subgraph store

Currently the block stream depends on a subgraph_store https://github.com/graphprotocol/graph-node/blob/381f22e07607c66affa18b39a76d4c6003405c8e/graph/src/blockchain/block_stream.rs#L196-L196 this for for two reasons, first for keeping track of the current block pointer for the subgraph https://github.com/graphprotocol/graph-node/blob/381f22e07607c66affa18b39a76d4c6003405c8e/graph/src/blockchain/block_stream.rs#L351-L351 which can be solved simply by the block stream keeping track of the last block pointer it emitted. Second for updating the synced status on the subgraph https://github.com/graphprotocol/graph-node/blob/381f22e07607c66affa18b39a76d4c6003405c8e/graph/src/blockchain/block_stream.rs#L591-L591 this responsibility should be moved to the instance manager. Maybe BlockStreamEvent should gain a Done variant to signal that this should be checked.

The pipelining mechanism

An attractive solution is to implement this as an adapter to the existing block stream. This would be a tokio task that pulls blocks from the block stream and puts it in a channel. Buffering a single range will likely be enough for the optimization to be effective. However, the fact that the block stream scans blocks in batches (ranges) but emits them as individual blocks will require some async cleverness in this adapter. Possibly the block stream itself should emit batches (a stream of Vec<BlockWithTriggers>) and the adapter would expose it as a stream of individual blocks.

Stopwatch metrics

Right now the block stream uses only the "scan_blocks" stopwatch section. We could keep the same name for backwards compatibility with the dashboards, and just make "scan_blocks" mean time spent waiting for the block stream to emit a block.

azf20 commented 3 years ago

@leoyvens I know you discussed this previously with @maoueh - is this part of the current StreamingFast work or is it still waiting to be picked up by someone?

maoueh commented 3 years ago

We worked on it a little bit, mainly pouring investigation and some time understanding. We have some half-baked implementation but we drifted towards updating Tokio so this stalled a bit.

Right now we are on Firehose, so if anybody else is ready to dig, we could sync a bit if required.

azf20 commented 3 years ago

OK thanks - with the Firehose implementation, would this optimisation still be applicable to the polling and the firehose approaches?

maoueh commented 3 years ago

Yes we think it would still make sense to have this optimization for both, probably with different "buffer" size.

We used some pre-processor in our own code to perform reception + protobuf decoding on another thread ensuring there is always at least another fully decoded block ready to be sent in the pipeline.

The design we had in mind was a composition struct BufferedBlockStream that would take a downstream dyn BlockStream and would buffered N blocks for it in advance. This BufferedBlockStream would be able to work with both implementation.

But for it to work, responsibilities like updating the subgraph store & the actual chain store must be moved somewhere because buffering would cause issue. It's mainly the dependencies listed by Leo above all in all.

Make the BlockStream really lean in its contract and pushing responsibilities to update storage outside it would allow a BufferedBlockStream struct.

All in all, a simpler approach inside PollingBlockStream directly could still be a good faster solution and should be evaluated.

leoyvens commented 3 years ago

@maoueh I'm onboard with the BufferedBlockStream adapter approach. I don't see buffer size as an issue, a buffer size of 1 should do it.

Edit: A buffer size of 1 might work for the firehose, but we probably want a larger buffer size for the polling block stream since it fetches blocks in batches.

leoyvens commented 2 years ago

Done in #3095