AmineDiro / daskqueue

Distributed Task Queue based Dask
MIT License
35 stars 0 forks source link

daskqueue

code style licence issues code style

daskqueue is small python library built on top of Dask and Dask Distributed that implements a very lightweight Distributed Task queue.

Think of this library as a simpler version of Celery built entirely on Dask. Running Celery on HPC environnment (for instance) is usually very tricky whereas spawning a Dask Cluster is a lot easier to manage, debug and cleanup.

Motivation

Dask is an amazing library for parallel computing written entirely in Python. It is easy to install and offer both a high level API wrapping common collections (arrays, bags, dataframes) and a low level API for written custom code (Task graph with Delayed and Futures).

For all its greatness, Dask implements a central scheduler (basically a simple tornado eventloop) involved in every decision, which can sometimes create a central bottleneck. This is a pretty serious limitation when trying use Dask in high throughput situation. A simple Task Queue is usually the best approach when trying to distribute millions of tasks.

The daskqueue python library leverages Dask Actors to implement distributed queues with a simple load balancer QueuePool and a Consummer class to consumme message from these queues.

We used Actors because:

Note : Dask provides a Queue implementation but they are mediated by the central scheduler, and so they are not ideal for sending large amounts of data (everything you send will be routed through a central point) and add additionnal overhead on the scheduler when trying to put millions of tasks.

Install

daskqueue requires Python 3.6 or newer. You can install manually by cloning the repository:

$ pip install daskqueue

Usage

This simple example show how to copy files in parallel using Dask workers and a distributed queue:

from distributed import Client
from daskqueue import QueuePool, ConsumerPool
from daskqueue.utils import logger

def process_item():
    return sum(i * i for i in range(10**8))

if __name__ == "__main__":
    client = Client(
        n_workers=5,
        # task function doesn't release the GIL
        threads_per_worker=1,
        direct_to_workers=True,
    )

    ## Params
    n_queues = 1
    n_consumers = 5

    queue_pool = QueuePool(client, n_queues=n_queues)

    consumer_pool = ConsumerPool(client, queue_pool, n_consumers=n_consumers)
    consumer_pool.start()

    for i in range(5):
        queue_pool.submit(process_item)

    # Wait for all work to be done
    consumer_pool.join()

    ## Get results
    results = consumer_pool.results()

Take a look at the examples/ folder to get some usage.

Implementation

You should think of daskqueue as a very simple distributed version of aiomultiprocessing. We have these basic classes:

Performance and Limitations

Benchmarks

The daskqueue library is very well suited for IO bound jobs: by running multiple consummers and queues, communication asynchronously, we can bypass the dask scheduler limit and process millions of tasks 🥰 !!

The example copy code above was ran on cluster of 20 consummers and 5 queues. The tasks ran are basic file copy between two location (copying form NFS filer). We copied 200 000 files (~ 1.1To) without ever breaking a sweat !

We can clearly see the network saturation:

Image

Looking at the scheduler metrics, we can have a mean of 19.3% Image

You can take a look at the benchmark/ directory for various benchmarks ran using daskqueue vs dask:

Throughput

| All files are mmaped so we don't see any performance degration for workloads that fit into memory.

Limitations

As for the limitation, given the current implementation, you should be mindfull of the following limitations (this list will be updated regularly):

Features roadmap

Contributing

Contributions are what makes the open-source community such an amazing place to learn, inspire, and create. This project is still very very rough! Any contributions you make will benefit everybody else and are greatly appreciated 😍 😍 😍 !

Please try to create bug reports that are:

Releasing

Releases are published automatically when a tag is pushed to GitHub.

git checkout master
git pull
# Set next version number
export RELEASE=x.x.x

# Create tags
git commit --allow-empty -m "Release $RELEASE"
git tag -a $RELEASE -m "Version $RELEASE"

# Push
git push upstream --tags

License

daskqueue is copyright Amine Dirhoussi, and licensed under the MIT license. I am providing code in this repository to you under an open source license. This is my personal repository; the license you receive to my code is from me and not from my employer. See the LICENSE file for details.