DefiantLabs / cosmos-upgrades

a tool to search for scheduled cosmos upgrades
MIT License
4 stars 2 forks source link

Add support for block-height to UTC time #3

Closed danbryan closed 1 year ago

danbryan commented 1 year ago

Based on the speed of the network and the unbonding time we can determine how many blocks per second happen.

Here is another script i have written in the past which calculates this . This functionality should be integrated and create a new field in the json dump with the UTC time of the upgrade based on the prediction function

"""cosmos script to calculate chain info"""

import requests
import typer
from dateutil import parser

# How many blocks to go back to calculate seconds per block.
# The higher the number, the more accurate the calculation
DELTA_BLOCKS = 35000
PRUNING_PADDING = 1000

def get_latest_height_time(peer):
    """get_latest_height_time returns the latest block height and time"""
    try:
        res = requests.get(f"{peer}/status", timeout=10)
        res.raise_for_status()
        data = res.json()
    except requests.exceptions.RequestException as e:
        raise SystemExit(f"Error getting latest block height and time: {e}\n{e.response.content}")

    try:
        latest_height = int(data["result"]["sync_info"]["latest_block_height"])
        latest_time_seconds = int(parser.isoparse(data["result"]["sync_info"]["latest_block_time"]).timestamp())
    except (ValueError, KeyError) as e:
        raise SystemExit(f"Error getting latest block height and time: {e}")

    return latest_height, latest_time_seconds

def get_delta_time(peer, delta_blocks, latest_height):
    """get_delta_time returns the time of the block at the given delta blocks"""
    delta_height = latest_height - delta_blocks
    try:
        res = requests.get(f"{peer}/block?height={delta_height}", timeout=10)
        res.raise_for_status()
        data = res.json()
    except requests.exceptions.RequestException as e:
        raise SystemExit(f"Error getting delta time: {e}\n{e.response.content}")

    try:
        delta_time_seconds = int(parser.isoparse(data["result"]["block"]["header"]["time"]).timestamp())
    except (ValueError, KeyError) as e:
        raise SystemExit(f"Error calculating delta_time_seconds: {e}")

    return delta_time_seconds

def main(rpc_peer: str, unbonding_days: int):
    latest_height, latest_time_seconds = get_latest_height_time(rpc_peer)
    delta_time_seconds = get_delta_time(rpc_peer, DELTA_BLOCKS, latest_height)

    # Calculate seconds per block
    seconds_per_block = round((latest_time_seconds - delta_time_seconds) / DELTA_BLOCKS, 2)
    print(f"Seconds per block: {seconds_per_block}")

    # Calculate unbonding period in blocks
    unbonding_period_blocks = round((unbonding_days * 86400) / seconds_per_block)
    print(f"Unbonding period blocks: {unbonding_period_blocks}")

    # Calculate minimum blocks to retain
    blocks_to_keep = int(round(((unbonding_period_blocks + PRUNING_PADDING) / DELTA_BLOCKS) + 0.5) * DELTA_BLOCKS)
    print(f"Set min-retain-blocks = {blocks_to_keep} in app.toml")

if __name__ == "__main__":
    typer.run(main)