citp / BlockSci

A high-performance tool for blockchain science and exploration
https://citp.github.io/BlockSci/
GNU General Public License v3.0
1.34k stars 259 forks source link

Parallel processing with BlockSci in Python #301

Open boshmaf opened 5 years ago

boshmaf commented 5 years ago

System Information

Using AMI: No
BlockSci version: 0.5.0 Blockchain: Bitcoin Parser: Disk Total memory: 256 GB
CPU count: 96

I have a general question about the best way to run a function which consumes BlockSci objects in parallel. We use BlockSci in our research, which is mostly off/on-chain analytics for security and privacy applications. I know the Map/Reduce interface but sometimes it is restricting if you're passing along other data or have different computation models (e.g., a graph).

Example

Given a set of Bitcoin addresses A and two lists of txes, X and Y, find all txes in X that happen before txes in Y, or happen after but are not linked to those txes in Y by any address in A.

In other words, we want to find all txes in X such that each tx: (1) has a block time that is less than the block time of any tx in Y, and if not (2) those txes in Y that have a block time less than that of tx do not have input or output addresses in common with tx which are in A.

Here's one way to do this in parallel:

import blocksci
import io
import multiprocessing

from functools import partial

def pickle(data):
    data_file = io.BytesIO()
    pickler = blocksci.pickler.Pickler(data_file)
    pickler.dump(data)
    return data_file

def unpickle(data_file, chain):
    if isinstance(data_file, str):
        data_file = open(data_file, "rb")
    data_file.seek(0)
    pickler = blocksci.pickler.Unpickler(data_file, chain)
    data = pickler.load()
    data_file.close()
    return data

def filter_txes(X_file, Y_file, blocks_dir, addresses_file):
    chain = blocksci.Blockchain(blocks_dir)
    X = unpickle(X_file, chain)
    Y = unpickle(Y_file, chain)
    A = unpickle(addresses_file, chain)

    result = set()
    for X_tx in X:
        common = A & (set(X_tx.inputs.address.all) | set(X_tx.outputs.address.all))
        for Y_tx in Y:
            if Y_tx.block_time > X_tx.block_time:
                continue
            if common & set(Y_tx.inputs.address.all):
                result.add(X_tx)
                break
            if common & set(Y_tx.outputs.address.all):
               result.add(X_tx) 
               break

    result = set(X) - result
    return pickle(result)

def filter_txes_parallel(blocks_dir, txes_file, addresses_file, segment_size=10**3):
    chain = blocksci.Blockchain(blocks_dir)
    txes = unpickle(txes_file, chain)

    X = txes["X"]
    Y = txes["Y"]

    indices = ((n, min(n+segment_size, len(X))) for n in range(0, len(X), segment_size))
    X_segments = (X[i:j] for (i,j) in indices)
    X_files = (pickle(segment) for segment in X_segments)
    Y_file = pickle(Y)

    result = set()
    with multiprocessing.Pool(multiprocessing.cpu_count()-1) as pool:
        task_func = partial(
            filter_txes,
            Y_file=Y_file,
            blocks_dir=blocks_dir,
            addresses_file=addresses_file)
        for subresult_file in pool.imap_unordered(task_func, X_files):
            subresult = unpickle(subresult_file, chain)
            result |= subresult

    return result

if __name__ == "__main__":
   result = filter_txes_parallel(
        blocks_dir="/path/to/BlockSci/data/",
        txes_file="/path/to/txes.pkl",
        addresses_file="/path/to/addresses.pkl")

On described system, it took about 1h30m to run on 92 cores, where len(A)=15K addresses, len(X)=200K txes, and len(Y)=350K txes.

You will notice that I'm unpickling all BlockSci objects locally for each process to avoid sharing them. This of course consumes more memory, which is becoming a bottleneck in this case (usually, our computations are CPU-bound).

Is this the right way to do this? Is there a more efficient way to do this (in terms of both space and time)? Any general guidelines, especially BlockSci-specific, would be great. Thanks!

maltemoeser commented 5 years ago

Not an answer to your question, but have you considered implementing the analysis in C++? I'd assume that to be a lot faster than doing the analysis using the Python interface.

boshmaf commented 5 years ago

Hello @maltemoeser. First of all, thanks for the wonderful system. BlockSci is useful in so many ways for what we're doing here at CIBR. We're building an open-source stack on top of BlockSci to enable off/on-chain analytics in a more usable and systematic way. Some of the requirements for this stack is tagging, linking, and searching addresses, txes, blocks, and chains using both a search engine-like style and a lower-level query language, which directly uses BlockSci lib.

As a proof-of-concept, we used the Python interface to meet some of these requirements. As such, we met the functional requirements except performance-related ones. We're migrating our design to C++ and Go, but for the sake of the PoC, I was wondering if there's a design pattern that is generally recommended for data-heavy workloads like the one in the example on a single machine.

As for this question, I'd like to keep it focused on the Python interface, because I know many people use it for rapid prototyping or running experiments as part of their academic research (which we do too). I'd love to chat with BlockSci team to share more details about what we're doing, as it might be interesting to you or be part of your roadmap anyway. Thanks!