neurospin / pylearn-epac

Embarrassingly Parallel Array Computing: EPAC is a machine learning workflow builder.
BSD 3-Clause "New" or "Revised" License
12 stars 3 forks source link

Multiprocess LocalEngine doesn't work with HDF5 objects #25

Closed duboism closed 11 years ago

duboism commented 11 years ago

The following code (which loads X and y from an HDF5 file) doesn't work.

import tables
import sklearn, sklearn.svm
import epac, epac.map_reduce.engine, epac.configuration

epac.configuration.conf.TRACE_TOPDOWN = True
epac.configuration.debug.DEBUG = True

# Run with Carray
h5file = tables.openFile('test_epac.hdf5', mode='r')
X = h5file.root.X
y = h5file.root.y

# Create a simple EPAC workflow
pipeline    = epac.Pipe(sklearn.svm.LinearSVC(class_weight='auto'))
cv_pipeline = epac.CV(pipeline, random_state=13031981)

local_engine = epac.map_reduce.engine.LocalEngine(cv_pipeline,       num_processes=2)
try:
    cv_pipeline = local_engine.run(X=X, y=y)
except Exception as e:
    print 'There was an exception:', e
finally:
    print 'Closing'
    h5file.close()
    res = cv_pipeline.reduce()
    print res

The error is:

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'weakref'>: attribute lookup __builtin__.weakref failed
JinpengLI commented 11 years ago

Small comment: obviously, the problem comes from hdf5 file cannot be picklized in multiprocessing. I am thinking if it is better to use multi-thread instead of using multi-process so that we don't need to pickle all the data.

duboism commented 11 years ago

It's probably something like that. However the simple example below (involving HDF5 and multiprocessing but not EPAC) works (it computes the mean of the 4 first columns):

import tables
import numpy
import multiprocessing

# Create data & store it
n_features = 10
n_obs      = 100
X = numpy.random.rand(n_obs, n_features)
y = numpy.zeros((n_obs,))
y[0:n_obs/2] = 1

h5file = tables.openFile('test_epac.hdf5', mode='w')
Xatom = tables.Atom.from_dtype(X.dtype)
Xhdf5 = h5file.createCArray(h5file.root, 'X', Xatom, X.shape)
Xhdf5[:] = X
yatom = tables.Atom.from_dtype(y.dtype)
yhdf5 = h5file.createCArray(h5file.root, 'y', yatom, y.shape)
yhdf5[:] = y
h5file.close()

# Reload the data
h5file = tables.openFile('test_epac.hdf5', mode='r')
X = h5file.root.X
y = h5file.root.y

# Use multiprocessing to perform a simple computation (column average)
def f(x):
    return x.mean()
p = multiprocessing.Pool(2)
col_mean = p.map(f, [X[:, 0], X[:, 1], X[:, 2], X[:, 3]])

However:

type(X)
<class 'tables.carray.CArray'>
>>> X
/X (CArray(100, 10)) ''
  atom := Float64Atom(shape=(), dflt=0.0)
  maindim := 0
  flavor := 'numpy'
  byteorder := 'little'
  chunkshape := (819, 10)
>>> x = X[:, 0]
>>> type(x)
<type 'numpy.ndarray'>

In other words, slicing a Carray returns a numpy.ndarray (which works well with EPAC). I will try to investigate more on this issue.

In any case, using threading instead of multiprocessing seems a good solution (same API, no need to pickle). AS jobs in EPAC are embarassingly parallel we should not run into concurrent access problems.

duboism commented 11 years ago

Hum, HDF5 Carray can't be passed to the worker function:

import random
import tables
import numpy
import multiprocessing

# Create data & store it
n_features = 10
n_obs      = 100
X = numpy.random.rand(n_obs, n_features)
y = numpy.zeros((n_obs,))
y[0:n_obs/2] = 1

h5file = tables.openFile('test_epac.hdf5', mode='w')
Xatom = tables.Atom.from_dtype(X.dtype)
Xhdf5 = h5file.createCArray(h5file.root, 'X', Xatom, X.shape)
Xhdf5[:] = X
yatom = tables.Atom.from_dtype(y.dtype)
yhdf5 = h5file.createCArray(h5file.root, 'y', yatom, y.shape)
yhdf5[:] = y
h5file.close()

# Reload the data
h5file = tables.openFile('test_epac.hdf5', mode='r')
X = h5file.root.X
y = h5file.root.y

# Use multiprocessing to perform a simple computation (column average)
def f(X):
    name = multiprocessing.current_process().name
    column = random.randint(0, n_features)
    print '%s use column %i' % (name, column)
    return X[:, column].mean()
p = multiprocessing.Pool(2)
col_mean = p.map(f, [X, X, X])

Running it yields the same weakref error. I will try to investigate on it.

JinpengLI commented 11 years ago

I have tried to replace f function as just pass, and it raises the same weakref error. I think we spend a little bit more on this issue. Maybe we could switch to another issue, and in the meantime we could make a threading branch to test.

def f(X):
    pass

p = multiprocessing.Pool(1)
col_mean = p.map(f, [X])
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'weakref'>: attribute lookup __builtin__.weakref failed
JinpengLI commented 11 years ago

threading in python cannot take advantage of multi-core machine according the link provided by @duboism

http://docs.python.org/2.7/library/threading.html

Therefore, for instant, we only use numpy array. We haven't a better solution yet.