alphatwirl / mantichora

A simple interface to Python multiprocessing and threading
BSD 3-Clause "New" or "Revised" License
16 stars 1 forks source link
atpbar multiprocessing threading

PyPI version Anaconda-Server Badge DOI Test Status codecov

Mantichora

A simple interface to multiprocessing and threading


Mantichora provides a simple interface to multiprocessing and threading.

from mantichora import mantichora

with mantichora() as mcore:
    mcore.run(func1)
    mcore.run(func2)
    mcore.run(func3)
    mcore.run(func4)
    results = mcore.returns()
 100.00% :::::::::::::::::::::::::::::::::::::::: |    12559 /    12559 |:  func1
  71.27% ::::::::::::::::::::::::::::             |    28094 /    39421 |:  func2
  30.34% ::::::::::::                             |    28084 /    92558 |:  func3
  35.26% ::::::::::::::                           |    27282 /    77375 |:  func4

You can simply give Mantichora as many functions as you need to run. Mantichora will run them concurrently in background processes by using multiprocessing or in different threads by threading and give you the return values of the functions. The return values are sorted in the order of the functions you have originally given to Mantichora. Progress bars from atpbar can be used in the functions.

The code in this package was originally developed in the sub-package concurrently of alphatwirl.

The examples in this file can be also run on Jupyter Notebook.
Binder



Requirement


Install

You can install with conda from conda-forge:

conda install -c conda-forge mantichora

or with pip:

pip install -U mantichora

User guide

Quick start

I will show here how to use Mantichora by simple examples.

Import libraries

We are going use two python standard libraries time and random in an example task function. In the example task function, we are also going to use atpbar for progress bars. Import these packages and mantichora.

import time, random
from atpbar import atpbar
from mantichora import mantichora

Define a task function

Let us define a simple task function.

def task_loop(name, ret=None):
    n = random.randint(1000, 10000)
    for i in atpbar(range(n), name=name):
        time.sleep(0.0001)
    return ret

The task in this function is to sleep for 0.0001 seconds as many times as the number randomly selected from between 1000 and 10000. atpbar is used to show a progress bar. The function takes two arguments: name, the label on the progress bar, and ret, the return value of the function.

Note: In the multiprocessing mode, the default mode of mantichora, task functions, their arguments, and their return values need to be picklable.

You can just try running this function without using Mantichora.

result = task_loop('task1', 'result1')

This doesn't return immediately. It waits for the function to finish. You will see a progress bar.

 100.00% :::::::::::::::::::::::::::::::::::::::: |    58117 /    58117 |:  task1

The return value is stored in result.

print(result)
 'result1'

Run tasks concurrently with Mantichora

Now, we run multiple tasks concurrently with Mantichora.

with mantichora(nworkers=3) as mcore:
    mcore.run(task_loop, 'task', ret='result1')
    mcore.run(task_loop, 'another task', ret='result2')
    mcore.run(task_loop, 'still another task', ret='result3')
    mcore.run(task_loop, 'yet another task', ret='result4')
    mcore.run(task_loop, 'task again', ret='result5')
    mcore.run(task_loop, 'more task', ret='result6')
    results = mcore.returns()

In the example code above, mantichora is initialized with an optional argument nworkers. The nworkers specifies the number of the workers. It is 3 in the above example. The default is 4. At most as many tasks as nworkers can run concurrently.

The with statement is used in the example. This ensures that mantichora properly ends the workers.

You can give task functions and their arguments to mcore.run(). You can call mcore.run() as many times as you need. In the above example, mcore.run() is called with the same task function with different arguments. You can also use a different function each time. mcore.run() returns immediately; it doesn't wait for the task to finish or even to start. In each call, mcore.run() only puts a task in a queue. The workers in background processes pick up tasks from the queue and run them.

The mcore.returns() waits until all tasks finish and returns their return values, which are sorted in the order of the tasks you have originally given to mcore.run().

Progress bars will be shown by atpbar.

 100.00% :::::::::::::::::::::::::::::::::::::::: |     1415 /     1415 |:  still another task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     7770 /     7770 |:  task again
 100.00% :::::::::::::::::::::::::::::::::::::::: |    18431 /    18431 |:  yet another task
 100.00% :::::::::::::::::::::::::::::::::::::::: |    25641 /    25641 |:  more task
 100.00% :::::::::::::::::::::::::::::::::::::::: |    74669 /    74669 |:  task
 100.00% :::::::::::::::::::::::::::::::::::::::: |    87688 /    87688 |:  another task

The results are sorted in the original order regardless of the order in which the tasks have finished.

print(results)
['result1', 'result2', 'result3', 'result4', 'result5', 'result6']

Features

Multiprocessing or Threading

New in version 0.10.0

From version 0.10.0, you can choose to use threading instead of multiprocessing by setting the option mode to threading (the default is multiprocessing).

with mantichora(mode='threading') as mcore:
    mcore.run(task_loop, 'task', ret='result1')
    mcore.run(task_loop, 'another task', ret='result2')
    mcore.run(task_loop, 'still another task', ret='result3')
    mcore.run(task_loop, 'yet another task', ret='result4')
    mcore.run(task_loop, 'task again', ret='result5')
    mcore.run(task_loop, 'more task', ret='result6')
    results = mcore.returns()

Without the with statement

end()

If you don't use the with statement, you need to call end().

mcore = mantichora()

mcore.run(task_loop, 'task', ret='result1')
mcore.run(task_loop, 'another task', ret='result2')
mcore.run(task_loop, 'still another task', ret='result3')
mcore.run(task_loop, 'yet another task', ret='result4')
mcore.run(task_loop, 'task again', ret='result5')
mcore.run(task_loop, 'more task', ret='result6')

results = mcore.returns()

mcore.end()
print(results)
 100.00% :::::::::::::::::::::::::::::::::::::::: |     4695 /     4695 |:  yet another task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     7535 /     7535 |:  still another task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     9303 /     9303 |:  another task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     9380 /     9380 |:  task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     5812 /     5812 |:  more task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     9437 /     9437 |:  task again
['result1', 'result2', 'result3', 'result4', 'result5', 'result6']
terminate()

In the multiprocessing mode, mantichora can be terminated with terminate(). After terminate() is called, end() still needs to be called. In the example below, terminate() is called after 0.5 seconds of sleep while some tasks are still running.

mcore = mantichora()

mcore.run(task_loop, 'task', ret='result1')
mcore.run(task_loop, 'another task', ret='result2')
mcore.run(task_loop, 'still another task', ret='result3')
mcore.run(task_loop, 'yet another task', ret='result4')
mcore.run(task_loop, 'task again', ret='result5')
mcore.run(task_loop, 'more task', ret='result6')

time.sleep(0.5)

mcore.terminate()
mcore.end()

The progress bars stop when the tasks are terminated.

 100.00% :::::::::::::::::::::::::::::::::::::::: |     2402 /     2402 |:  still another task
 100.00% :::::::::::::::::::::::::::::::::::::::: |     3066 /     3066 |:  another task
  59.28% :::::::::::::::::::::::                  |     2901 /     4894 |:  task
  69.24% :::::::::::::::::::::::::::              |     2919 /     4216 |:  yet another task
   0.00%                                          |        0 /     9552 |:  task again
   0.00%                                          |        0 /     4898 |:  more task

Note:: In the threading mode, terminate() does not do anything. If you initialize mantichora in the threading mode, i.e., mantichora(mode='threading'), in the above example, all tasks run until completion.


Receive results as tasks finish

Instead of waiting for all tasks to finish beofre receiving the reulsts, you can get results as tasks finish with the method receive_one() or receive_receive().

receive_one()

The method receive_one() returns a pair of the run ID and the return value of a task function. If no task has finished, receive_one() waits until one task finishes. receive_one() returns None if no tasks are outstanding. The method run() returns the run ID for the task.

with mantichora() as mcore:
    runids = [ ]
    runids.append(mcore.run(task_loop, 'task1', ret='result1'))
    runids.append(mcore.run(task_loop, 'task2', ret='result2'))
    runids.append(mcore.run(task_loop, 'task3', ret='result3'))
    runids.append(mcore.run(task_loop, 'task4', ret='result4'))
    runids.append(mcore.run(task_loop, 'task5', ret='result5'))
    runids.append(mcore.run(task_loop, 'task6', ret='result6'))
    #
    pairs = [ ]
    for i in range(len(runids)):
        pairs.append(mcore.receive_one())
 100.00% :::::::::::::::::::::::::::::::::::::::: |     1748 /     1748 |:  task3
 100.00% :::::::::::::::::::::::::::::::::::::::: |     4061 /     4061 |:  task1
 100.00% :::::::::::::::::::::::::::::::::::::::: |     2501 /     2501 |:  task5
 100.00% :::::::::::::::::::::::::::::::::::::::: |     2028 /     2028 |:  task6
 100.00% :::::::::::::::::::::::::::::::::::::::: |     8206 /     8206 |:  task4
 100.00% :::::::::::::::::::::::::::::::::::::::: |     9157 /     9157 |:  task2

The runid is the list of the run IDs in the order of the tasks that have been given to run().

print(runids)
[0, 1, 2, 3, 4, 5]

The pairs are in the order in which the tasks have finished.

print(pairs)
[(2, 'result3'), (0, 'result1'), (4, 'result5'), (5, 'result6'), (3, 'result4'), (1, 'result2')]
receive_finished()

The method receive_finished() returns a list of pairs of the run ID and the return value of finished task functions. The method receive_finished() doesn't wait for a task to finish. It returns an empty list if no task has finished.

with mantichora() as mcore:
    runids = [ ]
    runids.append(mcore.run(task_loop, 'task1', ret='result1'))
    runids.append(mcore.run(task_loop, 'task2', ret='result2'))
    runids.append(mcore.run(task_loop, 'task3', ret='result3'))
    runids.append(mcore.run(task_loop, 'task4', ret='result4'))
    runids.append(mcore.run(task_loop, 'task5', ret='result5'))
    runids.append(mcore.run(task_loop, 'task6', ret='result6'))
    #
    pairs = [ ]
    while len(pairs) < len(runids):
        pairs.extend(mcore.receive_finished())
 100.00% :::::::::::::::::::::::::::::::::::::::: |     3979 /     3979 |:  task3
 100.00% :::::::::::::::::::::::::::::::::::::::: |     6243 /     6243 |:  task2
 100.00% :::::::::::::::::::::::::::::::::::::::: |     6640 /     6640 |:  task1
 100.00% :::::::::::::::::::::::::::::::::::::::: |     8632 /     8632 |:  task4
 100.00% :::::::::::::::::::::::::::::::::::::::: |     6235 /     6235 |:  task5
 100.00% :::::::::::::::::::::::::::::::::::::::: |     8325 /     8325 |:  task6

The runid is the list of the run IDs in the order of the tasks that have been given to run().

print(runids)
[0, 1, 2, 3, 4, 5]

The pairs are in the order in which the tasks have finished.

print(pairs)
[(2, 'result3'), (1, 'result2'), (0, 'result1'), (3, 'result4'), (4, 'result5'), (5, 'result6')]

Logging

In the multiprocessing mode, logging in background processes is propagated to the main process. The progagation is implemented in the way described in a section of Logging Cookbook.

Note: In the threading mode, logging just works because it is thread-safe.

Here is a simple example task function that uses logging. The task function does logging just before returning.

import logging

def task_log(name, ret=None):
    n = random.randint(1000, 10000)
    for i in atpbar(range(n), name=name):
        time.sleep(0.0001)
    logging.info('finishing "{}"'.format(name))
    return ret

Set the logging stream to a string stream so that we can later retrieve the logging as a string.

import io
stream = io.StringIO()
logging.basicConfig(level=logging.INFO, stream=stream)

Run the tasks.

with mantichora() as mcore:
    mcore.run(task_log, 'task1', ret='result1')
    mcore.run(task_log, 'task2', ret='result2')
    mcore.run(task_log, 'task3', ret='result3')
    mcore.run(task_log, 'task4', ret='result4')
    results = mcore.returns()
 100.00% :::::::::::::::::::::::::::::::::::::::: |     4217 /     4217 |:  task2
 100.00% :::::::::::::::::::::::::::::::::::::::: |     7691 /     7691 |:  task3
 100.00% :::::::::::::::::::::::::::::::::::::::: |     8140 /     8140 |:  task1
 100.00% :::::::::::::::::::::::::::::::::::::::: |     9814 /     9814 |:  task4

Logging made in the task function in background processes is sent to the main process and written in the string stream.

print(stream.getvalue())
INFO:root:finishing "task2"
INFO:root:finishing "task3"
INFO:root:finishing "task1"
INFO:root:finishing "task4"

Start method of multiprocessing

Updated in version 0.12.0

Python multiprocessing has three start methods: fork, spawn, forkserver. These methods are described in the Python documentation. Mantichora uses by default the fork method on Linux and macOS and the spawn method on Windows. You can change the method by the option mp_start_method.

with mantichora(mp_start_method='spawn') as mcore:
    mcore.run(task_loop, 'task', ret='result1')
    mcore.run(task_loop, 'another task', ret='result2')
    mcore.run(task_loop, 'still another task', ret='result3')
    mcore.run(task_loop, 'yet another task', ret='result4')
    mcore.run(task_loop, 'task again', ret='result5')
    mcore.run(task_loop, 'more task', ret='result6')
    results = mcore.returns()

License


Contact