Open jarrodmillman opened 4 years ago
@stefanv also I think if there will be multiple persons working on paralleling different parts, then some implementation guide will be helpful.
@adnanmuttaleb Yes, we would like to work the higher level patterns for implementing parallelism before jumping in. Ray, dask, etc. are all suitable implementations of execution engines. The hope is that we can make task generation agnostic to the engine, but we haven't put much thought into it yet.
Have you thought about using the Cython parallelism for this?
Hi all, we are so interesting to see graph algorithms running in a parallel and distributed way, so that we can use this wonderful project as a production ready platform, for that Ray , which mentioned by adnanmuttaleb, is a wonderful platform to achieve this goal, is this still in plan or did there any updates?
We haven't had time to work on this yet. When we do, we will update this issue. As @stefanv mentioned, we will focus on the high-level interface and will likely support several backends such as Ray, dask, etc.
I am bringing this up since i am a big lover of parallelism. What about MPI and CUDA? Are we interested in them as well?
@kpetridis24 for networkx we need to work under the constraints of the pure-python plus not-too-many dependencies for the parallelization plan. We need to come up with a design where we keep the ethos the project (easy to read code) AND make these speed ups. So this may involve something like joblib or a high level interface where we can dispatch the computations.
For MPI there is graph-tool and for CUDA there is CuGraph.
If there's going to be an off-GitHub discussion on this, could you kindly post a link here.
@stefanv @jarrodmillman @MridulS @SultanOrazbayev
Not sure where this discussion went, but as an avid user of many parallelizable NetworkX algorithms, I still see a pressing need for this functionality, and would really like to help out.
Per the wish-list from the above comments, below is a prototype for a higher-level, joblib-centric API that provides an easy framework for developers who'd like to parallelize existing (or future) algorithms in the package. The API avoids the need to write redundant _parallel
variants of each parallelizable algorithm. Rather, it instead just requires invoking a single parallel_callable
kwarg to existing algorithms that avoids creating major breaking changes across the repo. The API, which is really just a callable-generating class, aspires to be backend-agnostic and verifiably works with multiprocessing, dask, ray, loky, threading, and ipyparallel backends.
from typing import Callable, Iterable, Tuple, Optional
SUPPORTED_BACKENDS = ['multiprocessing', 'dask', 'ray', 'loky', 'threading', 'ipyparallel']
class NxParallel:
"""A class to instantiate a callable object for handling parallelization of functions in
NetworkX. The class is initialized by specifying a backend and the number of processes to use,
and can then be called with a function object and an associated iterable as input. The
function will be called on each element of the iterable in parallel. The class can be used
with the multiprocessing, ipyparallel, dask, ray, and other native joblib backends.
Attributes
----------
backend : str
The backend to use. Choose from 'multiprocessing', 'dask', 'ray', 'loky', 'threading',
'ipyparallel'.
processes : int
The number of processes to use. If None, the number of processes will be set to the number
of CPUs on the machine.
Raises
------
`ImportError`
If joblib, or any of the optional backends are not installed.
`ValueError`
If an invalid backend is specified, or if the number of elements in the provided
iterable is not equal to the number of parameters in the provided function.
"""
def __init__(self, backend: str='multiprocessing', processes: Optional[int]=None, **kwargs):
try:
import joblib
except ImportError:
raise ImportError("joblib is not installed. Install joblib using 'pip install joblib'.")
self.backend = backend
if processes is None:
from os import cpu_count
self.processes = cpu_count()
else:
self.processes = processes
if self.backend in SUPPORTED_BACKENDS:
# Business logic restricted to this block
if self.backend == 'dask':
try:
from dask.distributed import Client
from joblib._dask import DaskDistributedBackend
except ImportError:
raise ImportError("dask is not installed. Install dask using 'pip install dask distributed'.")
client = Client(**kwargs)
joblib.register_parallel_backend('dask', lambda : DaskDistributedBackend(client=client))
elif self.backend == 'ray':
try:
from ray.util.joblib.ray_backend import RayBackend
except ImportError:
raise ImportError("ray is not installed. Install ray using 'pip install ray'.")
rb = RayBackend(**kwargs)
joblib.register_parallel_backend("ray", lambda : rb)
elif self.backend == 'ipyparallel':
try:
from ipyparallel import Client
from ipyparallel.joblib import IPythonParallelBackend
except ImportError:
raise ImportError("ipyparallel is not installed. Install ipyparallel using 'pip install ipyparallel'.")
c = Client(**kwargs)
bview = c.load_balanced_view()
joblib.register_parallel_backend('ipyparallel', lambda : IPythonParallelBackend(view=bview))
else:
raise ValueError(
f"Invalid backend specified. Choose from {SUPPORTED_BACKENDS}.")
def __call__(self, func: Callable, iterable: Iterable[Tuple], **kwargs):
"""Call the class instance with a function and an iterable.
The function will be called on each element of the iterable in parallel."""
import joblib, inspect
params = list(inspect.signature(func).parameters.keys())
with joblib.parallel_backend(self.backend):
return joblib.Parallel(n_jobs=self.processes, **kwargs)(joblib.delayed(func)(**dict(zip(params, i))) for i in iterable)
In the spirit of the betweenness_centrality
example, this would be used as follows:
import time
import itertools
import networkx as nx
# Create an instance of the NxParallel class
nx_parallel = NxParallel(backend='dask', processes=8)
G_ba = nx.barabasi_albert_graph(1000, 3)
G_er = nx.gnp_random_graph(1000, 0.01)
G_ws = nx.connected_watts_strogatz_graph(1000, 4, 0.1)
def chunks(l, n):
"""Divide a list of nodes `l` in `n` chunks"""
l_c = iter(l)
while 1:
x = tuple(itertools.islice(l_c, n))
if not x:
return
yield x
def betweenness_centrality(G, parallel_callable=nx_parallel):
"""Parallel betweenness centrality function"""
node_divisor = parallel_callable.processes * 4
node_chunks = list(chunks(G.nodes(), G.order() // node_divisor))
num_chunks = len(node_chunks)
iterable = zip(
[G] * num_chunks,
node_chunks,
[list(G)] * num_chunks,
[True] * num_chunks,
[None] * num_chunks,
)
bt_sc = parallel_callable(nx.betweenness_centrality_subset, iterable)
# Reduce the partial solutions
bt_c = bt_sc[0]
for bt in bt_sc[1:]:
for n in bt:
bt_c[n] += bt[n]
return bt_c
G_ba = nx.barabasi_albert_graph(1000, 3)
G_er = nx.gnp_random_graph(1000, 0.01)
G_ws = nx.connected_watts_strogatz_graph(1000, 4, 0.1)
for G in [G_ba, G_er, G_ws]:
print("\nComputing betweenness centrality for:")
print(nx.info(G))
print("\tParallel version")
start = time.time()
bt = betweenness_centrality(G, nx_parallel)
print(f"\t\tTime: {(time.time() - start):.4F} seconds")
print(f"\t\tBetweenness centrality for node 0: {bt[0]:.5f}")
print("\tNon-Parallel version")
start = time.time()
bt = nx.betweenness_centrality(G)
print(f"\t\tTime: {(time.time() - start):.4F} seconds")
print(f"\t\tBetweenness centrality for node 0: {bt[0]:.5f}\n")
I would love to at least start a PR with something like this (including tests), if it at all interests the NetworkX team?
@dPys
For folks who do end up on this issue, there are more developments going on in the https://github.com/networkx/nx_parallel repository.
We should develop some helper functions for parallelization and then use it in several places.
There are several comments in the code indicating where we could parallelize things:
There are also several open PRs:
And there is an example in our gallery: