ApeWorX / ape

The smart contract development tool for Pythonistas, Data Scientists, and Security Professionals
https://apeworx.io
Apache License 2.0
892 stars 131 forks source link

RFC: daskify #883

Open spinoch opened 2 years ago

spinoch commented 2 years ago

Overview

e.g. say I want to get all transactions made by a specific address (map), then filter based on some attributes then further reduce by adding up some values of each attribute. This would be trivial if it's an address with a few thousand tx's, but if we are talking about multiple addresses each with millions of transactions, a single compute node won't provide us results in a timely manner

Specification

TBD

This may be in some kind of plug-in, separated from core ape functionality

There are several approaches to this: perhaps the most intuitive for the end-user being constructing dask data structures such as dask.dataframe's and dask.bag's for the map part of the example listed above. The filter and reduction can then be directly applied to the dask data structure

e.g.

import get_transactions from ape_dask
from distributed import Client

client = Client(...) # Connect to distributed cluster
df = get_transactions(address)  # dask.DataFrame

nonzero_tx_values = df[df.tx_from == "0x..." && df.tx_value > 0].value # filter
total_value = nonzero_tx_values.sum().compute() # reduce

Dependencies

dask

fubuloubu commented 2 years ago

We have several supported query types (transactions for an account is one of them), and typically we add the .query(*param_names, **filter_args) -> pd.DataFrame method to all our query endpoints. You can do a basic form of map/filter/reduce using this functionality, basically .query is the map function call (prefer this name as its more explicit than .map), **filter_args is the arguments applied to filter the dataset (usually a block range or in this case a range of transaction nonce values), and *parameter_names is a primitive reduce, allowing you to only pull a subset of data from each row type (e.g. each block, transaction, event) to reduce the overall load on the query endpoint.

Internally, ape has plugin system for different query engines to fulfill each query request: the engine will give a time estimate in response to each request, and if selected will perform it, with a final step of receiving the data from other engines for updates. That all being said, we'd like to make the query system as efficient as possible, being smart about how they queries are performed would be really nice. We want to support common use cases like the ones you described.

With that overall picture of the architecture, can we refine this request anymore? Or does ape provide the basic skeleton that you are thinking of? (minus the ability to query transactions for a given account which is a work in progress).

I think one interesting point I pulled from this is that there are many "dataframe-like" libraries out there, where we enforce a particular one from Pandas we should perhaps find a more general interface for type checking

banteg commented 2 years ago

you can daskify ape like this, and then client.submit/map any function which calls ape:

class WorkerConnection(WorkerPlugin):
    def setup(self, worker):
        networks.ethereum.mainnet.use_default_provider().__enter__()

client.register_worker_plugin(WorkerConnection())
fubuloubu commented 2 years ago

you can daskify ape like this, and then client.submit/map any function which calls ape:

class WorkerConnection(WorkerPlugin):
    def setup(self, worker):
        networks.ethereum.mainnet.use_default_provider().__enter__()

client.register_worker_plugin(WorkerConnection())

It would be nice to make this more general and handle it internally :thinking:

spinoch commented 2 years ago

With that overall picture of the architecture, can we refine this request anymore? Or does ape provide the basic skeleton that you are thinking of? (minus the ability to query transactions for a given account which is a work in progress).

tbh I have never even used ape for more than a few minutes. To give you a qualified answer I'd have to look at the code a bit and try it out for some days. A non-qualified answer:

ape does provide some basic skeletons, but there is some additional setup required to run distributed workloads (and out-of-core computing?) From the interface you mention (.query(*param_names, **filter_args) -> pd.DataFrame), I was thinking about something like

import dask
import dask.dataframe as dd

def [...]_dquery(param_names_map, filter_args_map): -> dd.DataFrame:
    return dd.from_delayed(
        [
            # Setup a bunch of tasks that will perform the map + fitler queries
            dask.delayed([...].query, *param_names_map[i], **filter_args_map[i]) 
            for i in len(param_names_map)
        ], meta=expected_pandas_metadata  # should be known beforehand
    )

we should perhaps find a more general interface for type checking

If we want to deal with types properly, yes. But dealing with this in Python is unpleasant, to put it mildly. You might want to take a look at Apache Arrow and how it integrates with pandas in this regard, but I don't think there is so much value in going down this road as pandas is already a de facto standard in Python (dask's dataframes are just distributed pandas dataframes with some metadata)

you can daskify ape like this, and then client.submit/map any function which calls ape:

class WorkerConnection(WorkerPlugin):
    def setup(self, worker):
        networks.ethereum.mainnet.use_default_provider().__enter__()

client.register_worker_plugin(WorkerConnection())

that's cute :smile: