ponder-sh / ponder

A backend framework for crypto apps
https://ponder.sh
MIT License
585 stars 84 forks source link

feat: checkpoint selecting strategy on sync-store getEvents #907

Closed erensanlier closed 3 months ago

0xOlias commented 3 months ago

Hi @erensanlier, thanks for opening. This is a promising idea, but it's worth mentioning that on many apps, the getEvents query that we currently have performs well enough that the bottleneck during a fully sync indexing run is database round trips, not getEvents execution time. It's possible that going from 1 -> 4 queries might introduce a slowdown for some apps.

If you are able, could you provide a comment here including a before/after query plan?

erensanlier commented 3 months ago

Hey @0xOlias, sorry for not following up on this.

Opened this PR based on the idea of optimizing checkpoint selection in getEvents. Valid point that this implementation might slow down most apps. Possibly checkpoint selection might be added to the main query to address that concern.

Sharing the query plans for the following specific case:

getEvents tries to get 1000 events (ordered by checkpoint) from ponder_sync.logs table, which has +5M rows.

Query plan looks like this with the original query: QUERY PLAN
Limit (cost=518590.84..542595.53 rows=1001 width=3192)
-> Nested Loop Left Join (cost=518590.84..814105.15 rows=12323 width=3192)
-> Nested Loop Left Join (cost=518590.29..713330.23 rows=12323 width=1965)
-> Nested Loop (cost=518589.73..612904.50 rows=12323 width=1346)
-> Gather Merge (cost=518589.18..520024.39 rows=12323 width=313)
Workers Planned: 2
-> Sort (cost=517589.15..517601.99 rows=5135 width=313)
Sort Key: "*SELECT* 1".checkpoint
-> Parallel Append (cost=0.00..517272.68 rows=5135 width=313)
-> Subquery Scan on "*SELECT* 1" (cost=0.00..517272.68 rows=5135 width=313)
-> Nested Loop (cost=0.00..517221.33 rows=5135 width=345)
Join Filter: ((((logs_1."chainId")::numeric = '10'::numeric(16,0)) AND ((logs_1.address)::text = '0xdc6ff44d5d932cbd77b52e5612ba0529dc6226f1'::text) AND (logs_1."blockNumber" >= '107087966'::numeric) AND (logs_1."blockNumber" <= '109400000'::numeric) AND ("*VALUES*".column1 = 'log_WLD_Optimism_optimism'::text)) OR (((logs_1."chainId")::numeric = '1'::numeric(16,0)) AND ((logs_1.address)::text = '0x163f8c2467924be0ae7b5347228cabf260318753'::text) AND (logs_1."blockNumber" >= '17714705'::numeric) AND ("*VALUES*".column1 = 'log_WLD_Ethereum_mainnet'::text)))
-> Parallel Seq Scan on logs logs_1 (cost=0.00..516478.78 rows=10242 width=340)
Filter: (((checkpoint)::text > '169015438700000000000000100000000107277805000000000000000250000000000000058'::text) AND ((checkpoint)::text <= '171589431500000000000000010000000019885172999999999999999999999999999999'::text) AND ((topic0)::text = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'::text) AND (((("chainId")::numeric = '10'::numeric(16,0)) AND ((address)::text = '0xdc6ff44d5d932cbd77b52e5612ba0529dc6226f1'::text) AND ("blockNumber" >= '107087966'::numeric) AND ("blockNumber" <= '109400000'::numeric)) OR ((("chainId")::numeric = '1'::numeric(16,0)) AND ((address)::text = '0x163f8c2467924be0ae7b5347228cabf260318753'::text) AND ("blockNumber" >= '17714705'::numeric))))
-> Values Scan on "*VALUES*" (cost=0.00..0.03 rows=1 width=32)
Filter: ((column1 = 'log_WLD_Optimism_optimism'::text) OR (column1 = 'log_WLD_Ethereum_mainnet'::text))
-> Index Scan using blocks_pkey on blocks (cost=0.55..7.54 rows=1 width=1100)
Index Cond: ((hash)::text = ("*SELECT* 1"."blockHash")::text)
-> Index Scan using logs_pkey on logs (cost=0.56..8.15 rows=1 width=690)
Index Cond: (id = "*SELECT* 1".log_id)
-> Index Scan using transactions_pkey on transactions (cost=0.56..8.18 rows=1 width=1294)
Index Cond: ((hash)::text = ("*SELECT* 1"."transactionHash")::text)

When checkpoint is selected near the cursor:

QUERY PLAN
Limit (cost=35.47..35.47 rows=1 width=3192)
-> Sort (cost=35.47..35.47 rows=1 width=3192)
Sort Key: "*SELECT* 1".checkpoint
-> Nested Loop Left Join (cost=2.22..35.46 rows=1 width=3192)
-> Nested Loop Left Join (cost=1.67..26.88 rows=1 width=1965)
-> Nested Loop (cost=1.11..18.31 rows=1 width=1346)
-> Subquery Scan on "*SELECT* 1" (cost=0.56..9.74 rows=1 width=313)
-> Nested Loop (cost=0.56..9.73 rows=1 width=345)
Join Filter: ((((logs_1."chainId")::numeric = '10'::numeric(16,0)) AND ((logs_1.address)::text = '0xdc6ff44d5d932cbd77b52e5612ba0529dc6226f1'::text) AND (logs_1."blockNumber" >= '107087966'::numeric) AND (logs_1."blockNumber" <= '109400000'::numeric) AND ("*VALUES*".column1 = 'log_WLD_Optimism_optimism'::text)) OR (((logs_1."chainId")::numeric = '1'::numeric(16,0)) AND ((logs_1.address)::text = '0x163f8c2467924be0ae7b5347228cabf260318753'::text) AND (logs_1."blockNumber" >= '17714705'::numeric) AND ("*VALUES*".column1 = 'log_WLD_Ethereum_mainnet'::text)))
-> Index Scan using logs_checkpoint_index on logs logs_1 (cost=0.56..9.66 rows=1 width=340)
Index Cond: (((checkpoint)::text > '169015438700000000000000100000000107277805000000000000000250000000000000058'::text) AND ((checkpoint)::text <= '169015446100000000000000100000000107277842000000000000000250000000000000013'::text))
Filter: (((topic0)::text = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'::text) AND (((("chainId")::numeric = '10'::numeric(16,0)) AND ((address)::text = '0xdc6ff44d5d932cbd77b52e5612ba0529dc6226f1'::text) AND ("blockNumber" >= '107087966'::numeric) AND ("blockNumber" <= '109400000'::numeric)) OR ((("chainId")::numeric = '1'::numeric(16,0)) AND ((address)::text = '0x163f8c2467924be0ae7b5347228cabf260318753'::text) AND ("blockNumber" >= '17714705'::numeric))))
-> Values Scan on "*VALUES*" (cost=0.00..0.03 rows=1 width=32)
Filter: ((column1 = 'log_WLD_Optimism_optimism'::text) OR (column1 = 'log_WLD_Ethereum_mainnet'::text))
-> Index Scan using blocks_pkey on blocks (cost=0.55..8.57 rows=1 width=1100)
Index Cond: ((hash)::text = ("*SELECT* 1"."blockHash")::text)
-> Index Scan using logs_pkey on logs (cost=0.56..8.57 rows=1 width=690)
Index Cond: (id = "*SELECT* 1".log_id)
-> Index Scan using transactions_pkey on transactions (cost=0.56..8.57 rows=1 width=1294)
Index Cond: ((hash)::text = ("*SELECT* 1"."transactionHash")::text)
0xOlias commented 3 months ago

Thanks, this is great. Seems pretty definitive. How could we add the checkpoint selection to the main query - additional CTEs perhaps?

Also while you're running this, I'd find it valuable to see the query plan for the logs checkpoint selection query.

erensanlier commented 3 months ago

Thanks, this is great. Seems pretty definitive. How could we add the checkpoint selection to the main query - additional CTEs perhaps?

Also while you're running this, I'd find it valuable to see the query plan for the logs checkpoint selection query.

Yes, can be done with additional CTEs. This is the log selection query plan

QUERY PLAN
Subquery Scan on checkpoints (cost=219.20..219.43 rows=1 width=32) (actual time=1.010..1.011 rows=1 loops=1)
-> Limit (cost=219.20..219.42 rows=1 width=76) (actual time=1.008..1.008 rows=1 loops=1)
-> Index Only Scan using logs_checkpoint_index on logs (cost=0.56..1117980.68 rows=5113210 width=76) (actual time=0.093..0.968 rows=1001 loops=1)
Index Cond: ((checkpoint > '169015438700000000000000100000000107277805000000000000000250000000000000058'::text) AND (checkpoint <= '171589431500000000000000010000000019885172999999999999999999999999999999'::text))
Heap Fetches: 1001
Planning Time: 0.218 ms
Execution Time: 1.038 ms

It is super fast.

Also the following is the original query with max checkpoint selection using CTEs (it was 50s before, now its 2.6 s):

QUERY PLAN
Limit (cost=30897.58..30903.65 rows=52 width=3192) (actual time=2629.155..2634.761 rows=1000 loops=1)
InitPlan 1 (returns $0)
-> Aggregate (cost=372.94..372.95 rows=1 width=32) (actual time=1.679..1.684 rows=1 loops=1)
-> Append (cost=219.20..372.93 rows=3 width=107) (actual time=1.043..1.674 rows=2 loops=1)
-> Limit (cost=219.20..219.42 rows=1 width=76) (actual time=1.042..1.043 rows=1 loops=1)
-> Index Only Scan using logs_checkpoint_index on logs logs_2 (cost=0.56..1117980.68 rows=5113210 width=76) (actual time=0.102..1.003 rows=1001 loops=1)
Index Cond: ((checkpoint > '169015438700000000000000100000000107277805000000000000000250000000000000058'::text) AND (checkpoint <= '171589431500000000000000010000000019885172999999999999999999999999999999'::text))
Heap Fetches: 1001
-> Limit (cost=137.14..137.28 rows=1 width=76) (actual time=0.617..0.619 rows=1 loops=1)
-> Index Only Scan using "blockCheckpointIndex" on blocks blocks_1 (cost=0.55..181539.37 rows=1329066 width=76) (actual time=0.029..0.578 rows=1001 loops=1)
Index Cond: ((checkpoint > '169015438700000000000000100000000107277805000000000000000250000000000000058'::text) AND (checkpoint <= '171589431500000000000000010000000019885172999999999999999999999999999999'::text))
Heap Fetches: 0
-> Limit (cost=8.16..16.18 rows=1 width=168) (actual time=0.008..0.009 rows=0 loops=1)
-> Index Only Scan using "callTracesCheckpointIndex" on "callTraces" (cost=0.14..8.16 rows=1 width=168) (actual time=0.008..0.008 rows=0 loops=1)
Index Cond: ((checkpoint > '169015438700000000000000100000000107277805000000000000000250000000000000058'::text) AND (checkpoint <= '171589431500000000000000010000000019885172999999999999999999999999999999'::text))
Heap Fetches: 0
-> Gather Merge (cost=30524.62..30530.69 rows=52 width=3192) (actual time=2629.153..2634.671 rows=1000 loops=1)
Workers Planned: 2
Params Evaluated: $0
Workers Launched: 2
-> Sort (cost=29524.60..29524.66 rows=26 width=3192) (actual time=2608.176..2608.230 rows=355 loops=3)
Sort Key: "*SELECT* 1".checkpoint
Sort Method: top-N heapsort Memory: 4040kB
Worker 0: Sort Method: top-N heapsort Memory: 4040kB
Worker 1: Sort Method: top-N heapsort Memory: 4040kB
-> Nested Loop Left Join (cost=2.22..29523.99 rows=26 width=3192) (actual time=0.195..2513.069 rows=25257 loops=3)
-> Nested Loop Left Join (cost=1.67..29301.10 rows=26 width=1965) (actual time=0.167..1950.606 rows=25257 loops=3)
-> Nested Loop (cost=1.11..29078.15 rows=26 width=1346) (actual time=0.131..826.908 rows=25257 loops=3)
-> Parallel Append (cost=0.56..28855.33 rows=26 width=313) (actual time=0.104..285.600 rows=25257 loops=3)
-> Subquery Scan on "*SELECT* 1" (cost=0.56..28855.33 rows=26 width=313) (actual time=0.103..282.752 rows=25257 loops=3)
-> Nested Loop (cost=0.56..28855.07 rows=26 width=345) (actual time=0.102..266.802 rows=25257 loops=3)
Join Filter: ((((logs_1."chainId")::numeric = '10'::numeric(16,0)) AND ((logs_1.address)::text = '0xdc6ff44d5d932cbd77b52e5612ba0529dc6226f1'::text) AND (logs_1."blockNumber" >= '107087966'::numeric) AND (logs_1."blockNumber" <= '109400000'::numeric) AND ("*VALUES*".column1 = 'log_WLD_Optimism_optimism'::text)) OR (((logs_1."chainId")::numeric = '1'::numeric(16,0)) AND ((logs_1.address)::text = '0x163f8c2467924be0ae7b5347228cabf260318753'::text) AND (logs_1."blockNumber" >= '17714705'::numeric) AND ("*VALUES*".column1 = 'log_WLD_Ethereum_mainnet'::text)))
Rows Removed by Join Filter: 25257
-> Parallel Index Scan using logs_checkpoint_index on logs logs_1 (cost=0.56..28851.30 rows=52 width=340) (actual time=0.094..138.082 rows=25257 loops=3)
Index Cond: (((checkpoint)::text > '169015438700000000000000100000000107277805000000000000000250000000000000058'::text) AND ((checkpoint)::text <= $0))
Filter: (((topic0)::text = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'::text) AND (((("chainId")::numeric = '10'::numeric(16,0)) AND ((address)::text = '0xdc6ff44d5d932cbd77b52e5612ba0529dc6226f1'::text) AND ("blockNumber" >= '107087966'::numeric) AND ("blockNumber" <= '109400000'::numeric)) OR ((("chainId")::numeric = '1'::numeric(16,0)) AND ((address)::text = '0x163f8c2467924be0ae7b5347228cabf260318753'::text) AND ("blockNumber" >= '17714705'::numeric))))
-> Values Scan on "*VALUES*" (cost=0.00..0.03 rows=1 width=32) (actual time=0.001..0.002 rows=2 loops=75772)
Filter: ((column1 = 'log_WLD_Optimism_optimism'::text) OR (column1 = 'log_WLD_Ethereum_mainnet'::text))
-> Index Scan using blocks_pkey on blocks (cost=0.55..8.57 rows=1 width=1100) (actual time=0.019..0.019 rows=1 loops=75772)
Index Cond: ((hash)::text = ("*SELECT* 1"."blockHash")::text)
-> Index Scan using logs_pkey on logs (cost=0.56..8.57 rows=1 width=690) (actual time=0.042..0.042 rows=1 loops=75772)
Index Cond: (id = "*SELECT* 1".log_id)
-> Index Scan using transactions_pkey on transactions (cost=0.56..8.57 rows=1 width=1294) (actual time=0.019..0.019 rows=1 loops=75772)
Index Cond: ((hash)::text = ("*SELECT* 1"."transactionHash")::text)
Planning Time: 2.354 ms
Execution Time: 2635.314 ms
erensanlier commented 3 months ago

Maybe this draft implementation is way to complicated. @0xOlias Would you be able to measure performance metrics if I implement just max checkpoint selection as a part of the main query? We can progress from there for additional optimizations.

erensanlier commented 3 months ago

With this implementation I couldn't spot any difference in performance for normal size apps. Almost 20x improvement in the query plan I mentioned above.

0xOlias commented 3 months ago

Excellent work, the latest commit makes sense to me - very clever. Thinking through edge case conditions around the max and coalesce operators.

It would also be good to confirm that this works for SQLite.

erensanlier commented 3 months ago

@0xOlias don't really understand why SQLite check fails. Commit with exact same code (cde720333351ca046e7990c8cd04c0522927fd10) passed before. Would appreciate if you can look into it!

0xOlias commented 3 months ago

Sure, I'll take care of SQLite.

Here's my understanding when using .offset(limit).limit(1) for the log_checkpoints query.

If there are more than limit rows in the logs table that have a checkpoint greater than our fromCheckpoint, the log_checkpoints table will have one row containing the limit + 1th checkpoint. We can safely use this value as an upper bound for the checkpoint in our overall events query.

checkpoint
'1690154387...'

If there are fewer than limit rows in the logs table that have a checkpoint greater than our fromCheckpoint, the log_checkpoints table will have one row containing null. This means that there is no "hint" to be used, so we must fall back to the toCheckpoint in our overall events query.

checkpoint
null

Now, we can apply the same logic for blocks and callTraces. The rule for composing these results is a simple max, excluding nulls (which is the default behavior for SQL max). If all of the hints are null, the max across them will be null, and we must fall back to the toCheckpoint. The coalesce function works great for that logic.

Continuing to mull over the specifics of the query construction, but this makes sense to me. Why did you choose to remove the offset approach in favor of a limit? It seems like that will lead to an unnecessarily large all_checkpoints table.

erensanlier commented 3 months ago

I was trying to understand why SQLite fails, because first time I have tried without .offset it passed the tests.

Would prefer using offset over limit.

erensanlier commented 3 months ago

@0xOlias ready for merge? need anything further from my side?