DynamicsAndNeuralSystems / pyspi

Comparative analysis of pairwise interactions in multivariate time series.
https://time-series-features.gitbook.io/pyspi/
GNU General Public License v3.0
198 stars 26 forks source link

Calculator.compute() hangs indefinitely when run in Multiprocessing pool #53

Closed mesner closed 6 months ago

mesner commented 7 months ago

I'm trying to distribute thousands of pairs of datasets to the calculator, but it hangs indefinitely when run as a child process using python's multiprocessing (often at dcorr, dcorr_biased, or dtw when included). top indicates no cpu usage.

import numpy as np
import pandas as pd
import random
import time
from pyspi.calculator import Calculator
from multiprocessing import Pool

calc = Calculator()

def runCompute(dataset):
    calc.load_dataset(dataset)
    calc.compute()
    return calc.table.copy()

if __name__ == "__main__":
    print("running pool")
    random.seed(42)
    M = 2
    T = 50
    dataset = np.random.randn(M,T)

    # this works
    # rows = [runCompute(dataset)]

    # this hangs
    with Pool(1) as p:
         rows = p.map(runCompute, [dataset])
    print("complete")
    print(rows)
mesner commented 7 months ago

It seems this was caused by instantiating the calculator at the script / global / module scope.

This full example works for me. In my actual script, I rewrote the compute function to not use tqdm.

How do others run this package in a distributed manner?

import os
if "C:\\Program Files\\GNU Octave\\Octave-8.3.0\\mingw64\\bin" not in os.environ["PATH"]: os.environ["PATH"] += ";C:\\Program Files\\GNU Octave\\Octave-8.3.0\\mingw64\\bin"

import numpy as np
import pandas as pd
import random
import time

from pyspi.calculator import Calculator
from multiprocessing import Pool

def runCompute(dataset):
    if not hasattr(runCompute, "calc"):
        runCompute.calc = Calculator()
    calc = runCompute.calc

    calc.load_dataset(dataset)
    calc.compute()
    return calc.table.copy()

if __name__ == "__main__":
    print("running pool")
    random.seed(42)
    M = 2
    T = 50
    dataset = np.random.randn(M,T)

    with Pool(1) as p:
         rows = p.map(runCompute, [dataset])
    print("complete")
    print(rows)
benfulcher commented 7 months ago

I think @jmoo2880 and @anniegbryant have experience running distributed pyspi. cf. https://time-series-features.gitbook.io/pyspi/usage/advanced-usage/distributing-calculations-on-a-cluster

anniegbryant commented 6 months ago

Hi @mesner, sorry for the delayed reply. We recommend checking out the pyspi-distribute repo to submit jobs through a PBS job scheduling system on a high-performance computing cluster if you have access to this. We included a simple demo Jupyter notebook walking users through an example workflow. Hope this helps -- let us know if you encounter any issues with that or if it doesn't resolve your problem!