ethereum / web3.py

A python interface for interacting with the Ethereum blockchain and ecosystem.
http://web3py.readthedocs.io
MIT License
5.05k stars 1.72k forks source link

Pythonic class generator from contract's ABI #3016

Open F4ever opened 1 year ago

F4ever commented 1 year ago

General Currently, in web3py contract functions are dynamically generated in runtime. However code completion, type inference, and other intelligent features provided by IDEs doesn't work seamlessly with dynamically generated code.

It's easier to interact with smart contracts when interface and documentation is in human-readable state - just in simple pythonic classes generated via cli.

Benefits

Pseudocode generation example

MyContract.json

  {
    "inputs": [
      {
        "internalType": "bytes32",
        "name": "role",
        "type": "bytes32"
      } 
    ],
    "name": "contractFunctionName",
    "outputs": [
      {
        "components": [
          {
            "internalType": "uint256",
            "name": "firstInt",
            "type": "uint256"
          },
          {
            "internalType": "bytes32",
            "name": "secondByte",
            "type": "bytes32"
          }
        ],
        "internalType": "struct MyContract.SimpleStruct",
        "name": "inputStructName",
        "type": "tuple"
      }
    ],
    "stateMutability": "view",
    "type": "function"
  },
python3 web3 generate_class_from_abi ./MyContract.json

Generation example my_contract.py

# Could be dataclass or pydentic class
@dataclass
class SimpleStruct:
    first_int: int
    second_bytes: bytes

class MyContract(Contract):
    def __init__(w3, address):
        # Load abi from source json
        abi = self._load_abi()

        # Some contract initialization and generation from ContractFactoryClass
        ....

    def contract_function_name(self, role: bytes, block_identifier) -> SimpleStruct:
        # Here some validation that role is 32 bytes only
        # ...
        result = self.functions.contractFunctionName(role).call(block_identifier=block_identifier)

        return SimpleStruct(result)

Possible features

Challenges

F4ever commented 1 year ago

I saw something similar in js. Any suggestions? Will it be helpful? I'm ready to do MVP.

fselmo commented 1 year ago

Interesting thought @F4ever. This isn't something that we currently plan on building out but I'm happy to leave this open and continue the conversation if it generates interest and would prove useful to users.

sentilesdal commented 1 year ago

I just started working on this actually, to make our own dev ex better. I'd be happy to share what I come up with and possible collaboration with web3.py to get it into the repo or at least link as a tool.

abdelaziz-mahdy commented 8 months ago

i was searching for the same thing and couldnt findit

so this is my result code

import json
import warnings
import keyword
# Define type mappings from Solidity to Python
type_mapping = {
    'address': 'str',  # Solidity address to Python string
    'uint': 'int',     # Solidity unsigned integer to Python int
    'uint256': 'int',  # Solidity unsigned integer (256-bit) to Python int
    'int': 'int',      # Solidity integer to Python int
    'bool': 'bool',    # Solidity boolean to Python bool
    # Add more mappings as needed
}

def map_type(solidity_type):
    """
    Map a Solidity type to a Python type. If mapping is not found, return None.
    """
    python_type = type_mapping.get(solidity_type)
    if python_type is None:
        warnings.warn(f"No Python type mapping found for Solidity type '{solidity_type}'. Type annotation will be omitted.")
    return python_type
def adjust_param_name(name):
    """
    Adjust the parameter name if it conflicts with Python keywords.
    """
    return "_"+name if keyword.iskeyword(name) else name 

class ContractGenerator:
    def __init__(self, file_path):
        with open(file_path) as file:
            contract_data = json.load(file)
            if 'abi' not in contract_data:
                raise ValueError('ABI not found in contract data')
            if 'contractName' not in contract_data:
                raise ValueError('Contract name not found in contract data')
            self.abi = contract_data['abi']
            self.contract_name = contract_data.get('contractName', 'UnknownContract')
        self.views = []
        self.calls = []
        self.events = []
        self._categorize_abi()

    def _categorize_abi(self):
        for function in self.abi:
            if function['type'] == 'function':
                if function['stateMutability'] == 'view':
                    self.views.append(function)
                elif function['stateMutability'] in ['nonpayable', 'payable']:
                    self.calls.append(function)
            elif function['type'] == 'event':
                self.events.append(function)

    def _generate_function_doc(self, function):
        """
        Generate a documentation string for a function based on its inputs.
        """
        if not function['inputs']:
            return ""
        doc = "Parameters:\n"
        for inp in function['inputs']:
            python_type = map_type(inp['type'])
            param_name = adjust_param_name(inp['name'])
            doc += f"    {param_name} ({python_type if python_type else 'No type mapping'}): {inp['type']} type in Solidity\n"
        return doc

    def _generate_event_filter_method(self, event):
        params = []
        doc_params = []
        for inp in event['inputs']:
            if inp['indexed']:  # Consider only indexed inputs for filters
                python_type = map_type(inp['type'])
                param_name = inp['name']
                params.append(f"{param_name}_filter: {python_type} = None")
                doc_params.append(f"        {param_name}_filter ({python_type if python_type else 'No type mapping'}): Filter for '{inp['name']}' ({inp['type']})")
        params_signature = ", ".join(params)

        method = f"    def get_{event['name']}_events(self, fromBlock='latest', {params_signature}):\n"
        method += "        \"\"\"Fetch events of type {event['name']}.\n"
        method += "        Parameters:\n"
        method += f"        fromBlock (str): The starting block from which to fetch events. Defaults to 'latest'.\n"
        method += "\n".join(doc_params) + "\n"
        method += "        \"\"\"\n"
        method += "        filter_params = {}\n"
        for inp in event['inputs']:
            if inp['indexed']:
                param_name = inp['name']
                method += f"        if {param_name}_filter is not None:\n"
                method += f"            filter_params['{param_name}'] = {param_name}_filter\n"
        method += "        return self.contract.events." + event['name'] + f".create_filter(fromBlock=fromBlock, argument_filters=filter_params)\n\n"
        return method

    def generate_class(self):
        class_code = f"class {self.contract_name}:\n"
        class_code += "    def __init__(self, w3, address):\n"
        class_code += "        \"\"\"Initialize the contract with a Web3 instance and address.\"\"\"\n"
        class_code += "        self.contract = w3.eth.contract(address=address, abi=self.abi)\n\n"

        # Generate view methods
        for view in self.views:
            method_name = view['name']
            params = ", ".join([f"{adjust_param_name(inp['name'])}: {map_type(inp['type'])}" for inp in view['inputs']])
            method_params = ", ".join([adjust_param_name(inp['name']) for inp in view['inputs']])
            class_code += f"    def {method_name}(self{', ' if params else ''}{params}):\n"
            docstring = self._generate_function_doc(view).strip()
            if docstring:
                class_code += f"        \"\"\"{docstring}\"\"\"\n"
            class_code += f"        return self.contract.functions.{method_name}({method_params}).call()\n\n"

        # Generate call methods
        for call in self.calls:
            method_name = call['name']
            params = ", ".join([f"{adjust_param_name(inp['name'])}: {map_type(inp['type'])}" for inp in call['inputs']])
            method_params = ", ".join([adjust_param_name(inp['name']) for inp in call['inputs']])
            class_code += f"    def {method_name}(self{', ' if params else ''}{params}):\n"
            docstring = self._generate_function_doc(call).strip()
            if docstring:
                class_code += f"        \"\"\"{docstring}\"\"\"\n"
            class_code += f"        return self.contract.functions.{method_name}({method_params})\n\n"

        # Generate event methods with filters
        for event in self.events:
            class_code += self._generate_event_filter_method(event)

        return class_code

# Example usage
file_path ='test.json'
generator = ContractGenerator(file_path)
class_code = generator.generate_class()

print(class_code)

for input like this

{
"contractName": "ERC20",
"abi": [

    {
        "constant": true,
        "inputs": [],
        "name": "name",
        "outputs": [
            {
                "name": "",
                "type": "string"
            }
        ],
        "payable": false,
        "stateMutability": "view",
        "type": "function"
    },
    {
        "constant": false,
        "inputs": [
            {
                "name": "_spender",
                "type": "address"
            },
            {
                "name": "_value",
                "type": "uint256"
            }
        ],
        "name": "approve",
        "outputs": [
            {
                "name": "",
                "type": "bool"
            }
        ],
        "payable": false,
        "stateMutability": "nonpayable",
        "type": "function"
    },
    {
        "constant": true,
        "inputs": [],
        "name": "totalSupply",
        "outputs": [
            {
                "name": "",
                "type": "uint256"
            }
        ],
        "payable": false,
        "stateMutability": "view",
        "type": "function"
    },
    {
        "constant": false,
        "inputs": [
            {
                "name": "_from",
                "type": "address"
            },
            {
                "name": "_to",
                "type": "address"
            },
            {
                "name": "_value",
                "type": "uint256"
            }
        ],
        "name": "transferFrom",
        "outputs": [
            {
                "name": "",
                "type": "bool"
            }
        ],
        "payable": false,
        "stateMutability": "nonpayable",
        "type": "function"
    },
    {
        "constant": true,
        "inputs": [],
        "name": "decimals",
        "outputs": [
            {
                "name": "",
                "type": "uint8"
            }
        ],
        "payable": false,
        "stateMutability": "view",
        "type": "function"
    },
    {
        "constant": true,
        "inputs": [
            {
                "name": "_owner",
                "type": "address"
            }
        ],
        "name": "balanceOf",
        "outputs": [
            {
                "name": "balance",
                "type": "uint256"
            }
        ],
        "payable": false,
        "stateMutability": "view",
        "type": "function"
    },
    {
        "constant": true,
        "inputs": [],
        "name": "symbol",
        "outputs": [
            {
                "name": "",
                "type": "string"
            }
        ],
        "payable": false,
        "stateMutability": "view",
        "type": "function"
    },
    {
        "constant": false,
        "inputs": [
            {
                "name": "_to",
                "type": "address"
            },
            {
                "name": "_value",
                "type": "uint256"
            }
        ],
        "name": "transfer",
        "outputs": [
            {
                "name": "",
                "type": "bool"
            }
        ],
        "payable": false,
        "stateMutability": "nonpayable",
        "type": "function"
    },
    {
        "constant": true,
        "inputs": [
            {
                "name": "_owner",
                "type": "address"
            },
            {
                "name": "_spender",
                "type": "address"
            }
        ],
        "name": "allowance",
        "outputs": [
            {
                "name": "",
                "type": "uint256"
            }
        ],
        "payable": false,
        "stateMutability": "view",
        "type": "function"
    },
    {
        "payable": true,
        "stateMutability": "payable",
        "type": "fallback"
    },
    {
        "anonymous": false,
        "inputs": [
            {
                "indexed": true,
                "name": "owner",
                "type": "address"
            },
            {
                "indexed": true,
                "name": "spender",
                "type": "address"
            },
            {
                "indexed": false,
                "name": "value",
                "type": "uint256"
            }
        ],
        "name": "Approval",
        "type": "event"
    },
    {
        "anonymous": false,
        "inputs": [
            {
                "indexed": true,
                "name": "from",
                "type": "address"
            },
            {
                "indexed": true,
                "name": "to",
                "type": "address"
            },
            {
                "indexed": false,
                "name": "value",
                "type": "uint256"
            }
        ],
        "name": "Transfer",
        "type": "event"
    }
]}

the output class will be

class ERC20:
    def __init__(self, w3, address):
        """Initialize the contract with a Web3 instance and address."""
        self.contract = w3.eth.contract(address=address, abi=self.abi)

    def name(self):
        return self.contract.functions.name().call()

    def totalSupply(self):
        return self.contract.functions.totalSupply().call()

    def decimals(self):
        return self.contract.functions.decimals().call()

    def balanceOf(self, _owner: str):
        """Parameters:
    _owner (str): address type in Solidity"""
        return self.contract.functions.balanceOf(_owner).call()

    def symbol(self):
        return self.contract.functions.symbol().call()

    def allowance(self, _owner: str, _spender: str):
        """Parameters:
    _owner (str): address type in Solidity
    _spender (str): address type in Solidity"""
        return self.contract.functions.allowance(_owner, _spender).call()

    def approve(self, _spender: str, _value: int):
        """Parameters:
    _spender (str): address type in Solidity
    _value (int): uint256 type in Solidity"""
        return self.contract.functions.approve(_spender, _value)

    def transferFrom(self, _from: str, _to: str, _value: int):
        """Parameters:
    _from (str): address type in Solidity
    _to (str): address type in Solidity
    _value (int): uint256 type in Solidity"""
        return self.contract.functions.transferFrom(_from, _to, _value)

    def transfer(self, _to: str, _value: int):
        """Parameters:
    _to (str): address type in Solidity
    _value (int): uint256 type in Solidity"""
        return self.contract.functions.transfer(_to, _value)

    def get_Approval_events(self, fromBlock='latest', owner_filter: str = None, spender_filter: str = None):
        """Fetch events of type {event['name']}.
        Parameters:
        fromBlock (str): The starting block from which to fetch events. Defaults to 'latest'.
        owner_filter (str): Filter for 'owner' (address)
        spender_filter (str): Filter for 'spender' (address)
        """
        filter_params = {}
        if owner_filter is not None:
            filter_params['owner'] = owner_filter
        if spender_filter is not None:
            filter_params['spender'] = spender_filter
        return self.contract.events.Approval.create_filter(fromBlock=fromBlock, argument_filters=filter_params)

    def get_Transfer_events(self, fromBlock='latest', from_filter: str = None, to_filter: str = None):
        """Fetch events of type {event['name']}.
        Parameters:
        fromBlock (str): The starting block from which to fetch events. Defaults to 'latest'.
        from_filter (str): Filter for 'from' (address)
        to_filter (str): Filter for 'to' (address)
        """
        filter_params = {}
        if from_filter is not None:
            filter_params['from'] = from_filter
        if to_filter is not None:
            filter_params['to'] = to_filter
        return self.contract.events.Transfer.create_filter(fromBlock=fromBlock, argument_filters=filter_params)

sadly i cant integrate it in web3py so i am providing it here incase it helps other people, and would love if it helps adding it to web3py, if there is a way i can provide it as a pr without the cli part i would love to do so