erigontech / erigon

Ethereum implementation on the efficiency frontier https://erigon.gitbook.io
GNU Lesser General Public License v3.0
3.1k stars 1.09k forks source link

Added RPC filter limits, metrics, and better memory management. (#10718) #11894

Closed taratorio closed 1 week ago

taratorio commented 1 week ago

relates to https://github.com/erigontech/erigon/issues/11890 cherry pick from E3 https://github.com/erigontech/erigon/commit/adf3f438d8aae4c749e5ddacb3fe08afd6e695e7



RPC nodes would frequently OOM. Based on metrics and pprof, I observed high memory utilization in the rpchelper:

File: erigon
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 69696.67MB, 95.37% of 73083.31MB total
Dropped 1081 nodes (cum <= 365.42MB)
Showing top 10 nodes out of 43
      flat  flat%   sum%        cum   cum%
65925.80MB 90.21% 90.21% 65925.80MB 90.21%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1
 1944.80MB  2.66% 92.87%  1944.80MB  2.66%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddLogs.func1
 1055.97MB  1.44% 94.31%  1055.97MB  1.44%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1
  362.04MB   0.5% 94.81%   572.20MB  0.78%  github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState
  257.73MB  0.35% 95.16%   587.85MB   0.8%  encoding/json.Marshal
   57.85MB 0.079% 95.24%  1658.66MB  2.27%  github.com/ledgerwatch/erigon/core/vm.(*EVMInterpreter).Run
   51.48MB  0.07% 95.31%  2014.50MB  2.76%  github.com/ledgerwatch/erigon/rpc.(*handler).handleCallMsg
   23.01MB 0.031% 95.34%  1127.21MB  1.54%  github.com/ledgerwatch/erigon/core/vm.opCall
   11.50MB 0.016% 95.36%  1677.68MB  2.30%  github.com/ledgerwatch/erigon/core/vm.(*EVM).call
    6.50MB 0.0089% 95.37%   685.70MB  0.94%  github.com/ledgerwatch/erigon/turbo/transactions.DoCall
(pprof) list distributeLog.func1
Total: 71.37GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/logsfilter.go
   64.38GB    64.38GB (flat, cum) 90.21% of Total
         .          .    206:   a.logsFilters.Range(func(k LogsSubID, filter *LogsFilter) error {
         .          .    207:           if filter.allAddrs == 0 {
         .          .    208:                   _, addrOk := filter.addrs.Get(gointerfaces.ConvertH160toAddress(eventLog.Address))
         .          .    209:                   if !addrOk {
         .          .    210:                           return nil
         .          .    211:                   }
         .          .    212:           }
         .          .    213:           var topics []libcommon.Hash
         .          .    214:           for _, topic := range eventLog.Topics {
   27.01GB    27.01GB    215:                   topics = append(topics, gointerfaces.ConvertH256ToHash(topic))
         .          .    216:           }
         .          .    217:           if filter.allTopics == 0 {
         .          .    218:                   if !a.chooseTopics(filter, topics) {
         .          .    219:                           return nil
         .          .    220:                   }
         .          .    221:           }
   37.37GB    37.37GB    222:           lg := &types2.Log{
         .          .    223:                   Address:     gointerfaces.ConvertH160toAddress(eventLog.Address),
         .          .    224:                   Topics:      topics,
         .          .    225:                   Data:        eventLog.Data,
         .          .    226:                   BlockNumber: eventLog.BlockNumber,
         .          .    227:                   TxHash:      gointerfaces.ConvertH256ToHash(eventLog.TransactionHash),
(pprof) %
  1. Architecture Design Changes
    • Implement architectural changes to pin clients to RPC Nodes ensuring subscribers that request updates using their subscription IDs will hit the same node, which does clean up the objecs on each request.
    • Reason for not choosing: This still relies on clients with subscriptions to request updates and if they never do and do not call the unsubscribe function it's an unbound memory leak. The RPC node operator has no control over the behavior of the clients.
  2. Implementing Timeouts
    • Introduce timeouts for subscriptions to automatically clean up unresponsive or inactive subscriptions.
    • Reason for not choosing: Implementing timeouts would inadvertently stop websocket subscriptions due to the current design of the code, leading to a potential loss of data for active users. This solution would be good but requires a lot more work.
  3. Configurable Limits (Chosen Solution)
    • Set configurable limits for various subscription parameters (e.g., logs, headers, transactions, addresses, topics) to manage memory utilization effectively.
    • Reason for choosing: This approach provides flexibility to RPC node operators to configure limits as per their requirements. The default behavior remains unchanged, making it a non-breaking change. Additionally, it ensures current data is always available by pruning the oldest data first.
    --rpc.subscription.filters.maxlogs=60
    --rpc.subscription.filters.maxheaders=60
    --rpc.subscription.filters.maxtxs=10_000
    --rpc.subscription.filters.maxaddresses=1_000
    --rpc.subscription.filters.maxtopics=1_000
File: erigon
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 1146.51MB, 89.28% of 1284.17MB total
Dropped 403 nodes (cum <= 6.42MB)
Showing top 10 nodes out of 124
      flat  flat%   sum%        cum   cum%
  923.64MB 71.92% 71.92%   923.64MB 71.92%  github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1
   54.59MB  4.25% 76.18%    54.59MB  4.25%  github.com/ledgerwatch/erigon/rlp.(*Stream).Bytes
   38.21MB  2.98% 79.15%    38.21MB  2.98%  bytes.growSlice
   29.34MB  2.28% 81.44%    29.34MB  2.28%  github.com/ledgerwatch/erigon/p2p/rlpx.growslice
   26.44MB  2.06% 83.49%    34.16MB  2.66%  compress/flate.NewWriter
   18.66MB  1.45% 84.95%    21.16MB  1.65%  github.com/ledgerwatch/erigon/core/state.(*stateObject).GetCommittedState
   17.50MB  1.36% 86.31%    64.58MB  5.03%  github.com/ledgerwatch/erigon/core/types.(*DynamicFeeTransaction).DecodeRLP
      15MB  1.17% 87.48%    82.08MB  6.39%  github.com/ledgerwatch/erigon/core/types.UnmarshalTransactionFromBinary
   13.58MB  1.06% 88.54%    13.58MB  1.06%  github.com/ledgerwatch/erigon/turbo/stages/headerdownload.(*HeaderDownload).addHeaderAsLink
    9.55MB  0.74% 89.28%     9.55MB  0.74%  github.com/ledgerwatch/erigon/turbo/rpchelper.newChanSub[...]
(pprof) list AddPendingTxs.func1
Total: 1.25GB
ROUTINE ======================== github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).AddPendingTxs.func1 in github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go
  923.64MB   923.64MB (flat, cum) 71.92% of Total
         .          .    698:   ff.pendingTxsStores.DoAndStore(id, func(st [][]types.Transaction, ok bool) [][]types.Transaction {
         .          .    699:           if !ok {
         .          .    700:                   st = make([][]types.Transaction, 0)
         .          .    701:           }
         .          .    702:
         .          .    703:           // Calculate the total number of transactions in st
         .          .    704:           totalTxs := 0
         .          .    705:           for _, txBatch := range st {
         .          .    706:                   totalTxs += len(txBatch)
         .          .    707:           }
         .          .    708:
         .          .    709:           maxTxs := ff.config.RpcSubscriptionFiltersMaxTxs
         .          .    710:           // If adding the new transactions would exceed maxTxs, remove oldest transactions
         .          .    711:           if maxTxs > 0 && totalTxs+len(txs) > maxTxs {
         .          .    712:                   // Flatten st to a single slice
  918.69MB   918.69MB    713:                   flatSt := make([]types.Transaction, 0, totalTxs)
         .          .    714:                   for _, txBatch := range st {
         .          .    715:                           flatSt = append(flatSt, txBatch...)
         .          .    716:                   }
         .          .    717:
         .          .    718:                   // Remove the oldest transactions to make space for new ones
         .          .    719:                   if len(flatSt)+len(txs) > maxTxs {
         .          .    720:                           flatSt = flatSt[len(flatSt)+len(txs)-maxTxs:]
         .          .    721:                   }
         .          .    722:
         .          .    723:                   // Convert flatSt back to [][]types.Transaction with a single batch
         .          .    724:                   st = [][]types.Transaction{flatSt}
         .          .    725:           }
         .          .    726:
         .          .    727:           // Append the new transactions as a new batch
    5.95MB     4.95MB    728:           st = append(st, txs)
         .          .    729:           return st
         .          .    730:   })
         .          .    731:}
         .          .    732:
         .          .    733:// ReadPendingTxs reads pending transactions from the store associated with the given subscription ID.

With these changes, the memory utilization has significantly improved, and the system is now more stable and predictable.


<img width="1567" alt="image" src="https://github.com/ledgerwatch/erigon/assets/787344/264c9757-93cf-4be8-9883-5ca3187acd73">

Blue line indicates a deployment Sharp drop of green line is an OOM