pinax-network / substreams-sink

Substreams Sink library
MIT License
4 stars 1 forks source link

Add `progress` prometheus metrics #30

Open DenisCarriere opened 6 months ago

DenisCarriere commented 6 months ago
export function onPrometheusMetrics(emitter: BlockEmitter) {
   ...
   emitter.on("progress", handleProgress);
}

export function handleProgress(progress: ModulesProgress) {
    ...
}

References

https://github.com/streamingfast/substreams/blob/47ddf079aa7fcd89c03eb99771cda6749b56c20e/proto/sf/substreams/rpc/v2/service.proto#L132-L145


// ModulesProgress is a message that is sent every 500ms
message ModulesProgress {
  // previously: repeated ModuleProgress modules = 1;
  // these previous `modules` messages were sent in bursts and are not sent anymore.
  reserved 1;
  // List of jobs running on tier2 servers
  repeated Job running_jobs = 2;
  // Execution statistics for each module
  repeated ModuleStats modules_stats = 3;
  // Stages definition and completed block ranges
  repeated Stage stages = 4;

  ProcessedBytes processed_bytes = 5;
}

message ProcessedBytes {
  uint64 total_bytes_read = 1;
  uint64 total_bytes_written = 2;
}

message Error {
  string module = 1;
  string reason = 2;
  repeated string logs = 3;
  // FailureLogsTruncated is a flag that tells you if you received all the logs or if they
  // were truncated because you logged too much (fixed limit currently is set to 128 KiB).
  bool logs_truncated = 4;
}

message Job {
    uint32 stage = 1;
    uint64 start_block = 2;
    uint64 stop_block = 3;
    uint64 processed_blocks = 4;
    uint64 duration_ms = 5;
}

message Stage {
    repeated string modules = 1;
    repeated BlockRange completed_ranges = 2;
}

// ModuleStats gathers metrics and statistics from each module, running on tier1 or tier2
// All the 'count' and 'time_ms' values may include duplicate for each stage going over that module
message ModuleStats {
    // name of the module
    string name = 1;

    // total_processed_blocks is the sum of blocks sent to that module code
    uint64 total_processed_block_count = 2;
    // total_processing_time_ms is the sum of all time spent running that module code
    uint64 total_processing_time_ms = 3;

    //// external_calls are chain-specific intrinsics, like "Ethereum RPC calls".
    repeated ExternalCallMetric external_call_metrics = 4;

    // total_store_operation_time_ms is the sum of all time spent running that module code waiting for a store operation (ex: read, write, delete...)
    uint64 total_store_operation_time_ms = 5;
    // total_store_read_count is the sum of all the store Read operations called from that module code
    uint64 total_store_read_count = 6;

    // total_store_write_count is the sum of all store Write operations called from that module code (store-only)
    uint64 total_store_write_count = 10;

    // total_store_deleteprefix_count is the sum of all store DeletePrefix operations called from that module code (store-only)
    // note that DeletePrefix can be a costly operation on large stores
    uint64 total_store_deleteprefix_count = 11;

    // store_size_bytes is the uncompressed size of the full KV store for that module, from the last 'merge' operation (store-only)
    uint64 store_size_bytes = 12;

    // total_store_merging_time_ms is the time spent merging partial stores into a full KV store for that module (store-only)
    uint64 total_store_merging_time_ms = 13;

    // store_currently_merging is true if there is a merging operation (partial store to full KV store) on the way.
    bool store_currently_merging = 14;

    // highest_contiguous_block is the highest block in the highest merged full KV store of that module (store-only)
    uint64 highest_contiguous_block = 15;
}