radiome-lab / radiome

Configurable Pipeline for the Analysis of Medical Imaging Data
BSD 3-Clause "New" or "Revised" License
2 stars 0 forks source link

Dask failed to execute some of Python Jobs #64

Closed puorc closed 4 years ago

puorc commented 4 years ago

Try to execute Unet in DaskExecutor but failed, while it succeed and produced expected results in Executor.

Code https://github.com/radiome-lab/preprocessing/blob/master/tests/test_skullstrip.py https://github.com/radiome-lab/preprocessing/tree/master/radiome/workflows/preprocessing/skullstrip/unet

Info:

2020-03-23 16:00:20,060-15s radiome.execution.state INFO: Executing with DaskExecution
/Users/pz/workspace/demo/venv/lib/python3.8/site-packages/distributed/worker.py:3337: UserWarning: Large object of size 9.95 MB detected in task graph: 
  {'SG': <networkx.classes.digraph.DiGraph object at 0x12e9012b0>}
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  warnings.warn(
2020-03-23 16:00:20,116-15s radiome.execution.executor INFO: Joining execution of 1 executions: ['execute_subgraph-48621418-0f24-4c25-bf99-74281d0259ec']
Exception ignored in: <function State.__del__ at 0x113c50ee0>
Traceback (most recent call last):
  File "/Users/pz/workspace/radiome/radiome/core/execution/__init__.py", line 52, in __del__
    if not isinstance(self._resource, Job):
AttributeError: 'State' object has no attribute '_resource'
distributed.worker - WARNING - Could not deserialize task
Traceback (most recent call last):
  File "/Users/pz/workspace/demo/venv/lib/python3.8/site-packages/distributed/worker.py", line 2410, in _maybe_deserialize_task
    function, args, kwargs = _deserialize(*self.tasks[key])
  File "/Users/pz/workspace/demo/venv/lib/python3.8/site-packages/distributed/worker.py", line 3247, in _deserialize
    kwargs = pickle.loads(kwargs)
  File "/Users/pz/workspace/demo/venv/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
ModuleNotFoundError: No module named 'radiome_workflow_unet'

Possible causes and analysis

  1. State is not initialized properly. However, this error only happens in __del__.
  2. The error should happen in this block:

        train_model = UNet2d(dim_in=config.dim_in, num_conv_block=config.num_conv_block, kernel_root=config.kernel_root)
        if config.unet_model.lower().startswith('s3://'):
            unet_path = S3Resource(config.unet_model, working_dir=tempfile.mkdtemp())()
        else:
            unet_path = config.unet_model
        checkpoint = torch.load(unet_path, map_location={'cuda:0': 'cpu'})
        train_model.load_state_dict(checkpoint['state_dict'])
        model = nn.Sequential(train_model, nn.Softmax2d())
    
        # create a node called unet_mask
        unet_mask = PythonJob(function=predict_volumes, reference='unet_mask')
        unet_mask.model = Resource(model)
        unet_mask.cimg_in = anat

    Tested it in Executor and looks good. Guess there may be errors in serialization/deserialization process.

anibalsolon commented 4 years ago

@puorc you might want to check the develop branch on C-PAC, now the model is not passed anymore as a parameter, but just the path to the model. So the model per se is created during execution, within the predict_volumes function.

puorc commented 4 years ago

Hi Anibal, per the unet case, I assumed it might be the PyTorch issue. But looks like the same problem happened in ANTs registration.

Code: https://github.com/radiome-lab/registration/blob/master/radiome/workflows/registration/ants/workflow.py

https://github.com/radiome-lab/registration/blob/master/tests/test_ants.py (set linear to false)

Info:

2020-04-02 09:58:48,282-15s radiome.execution.executor INFO: Joining execution of 3 executions: ['execute_subgraph-7ef3a051-78ca-4bfe-ae16-c004f19f8e88', 'execute_subgraph-64ae3c36-bb7b-4feb-9b71-e6064dfa5edb', 'execute_subgraph-ce227b59-a5e3-4ec4-8385-7a01ffbee450']
Exception ignored in: <function State.__del__ at 0x113cdcc10>
Traceback (most recent call last):
  File "/Users/pu/workspace/radiome/radiome/core/execution/__init__.py", line 52, in __del__
    if not isinstance(self._resource, Job):
AttributeError: 'State' object has no attribute '_resource'
distributed.worker - WARNING - Could not deserialize task
Traceback (most recent call last):
  File "/Users/pu/workspace/registration/venv/lib/python3.8/site-packages/distributed/worker.py", line 2410, in _maybe_deserialize_task
    function, args, kwargs = _deserialize(*self.tasks[key])
  File "/Users/pu/workspace/registration/venv/lib/python3.8/site-packages/distributed/worker.py", line 3247, in _deserialize
    kwargs = pickle.loads(kwargs)
  File "/Users/pu/workspace/registration/venv/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
ModuleNotFoundError: No module named 'radiome_workflow_ants'
Exception ignored in: <function State.__del__ at 0x11e20bf70>
Traceback (most recent call last):
  File "/Users/pu/workspace/radiome/radiome/core/execution/__init__.py", line 52, in __del__
    if not isinstance(self._resource, Job):
AttributeError: 'State' object has no attribute '_resource'
distributed.worker - WARNING - Could not deserialize task
Traceback (most recent call last):
  File "/Users/pu/workspace/registration/venv/lib/python3.8/site-packages/distributed/worker.py", line 2410, in _maybe_deserialize_task
    function, args, kwargs = _deserialize(*self.tasks[key])
  File "/Users/pu/workspace/registration/venv/lib/python3.8/site-packages/distributed/worker.py", line 3247, in _deserialize
    kwargs = pickle.loads(kwargs)
  File "/Users/pu/workspace/registration/venv/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
ModuleNotFoundError: No module named 'radiome_workflow_ants'
Exception ignored in: <function State.__del__ at 0x112259f70>
Traceback (most recent call last):
  File "/Users/pu/workspace/radiome/radiome/core/execution/__init__.py", line 52, in __del__
    if not isinstance(self._resource, Job):
AttributeError: 'State' object has no attribute '_resource'
distributed.worker - WARNING - Could not deserialize task
Traceback (most recent call last):
  File "/Users/pu/workspace/registration/venv/lib/python3.8/site-packages/distributed/worker.py", line 2410, in _maybe_deserialize_task
    function, args, kwargs = _deserialize(*self.tasks[key])
  File "/Users/pu/workspace/registration/venv/lib/python3.8/site-packages/distributed/worker.py", line 3247, in _deserialize
    kwargs = pickle.loads(kwargs)
  File "/Users/pu/workspace/registration/venv/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
ModuleNotFoundError: No module named 'radiome_workflow_ants'
2020-04-02 09:58:49,257-15s radiome.execution.state INFO: Gathering resources
2020-04-02 09:58:49,258-15s radiome.execution.state INFO: Wiping out PythonJob(d7f6c670,calc_ants_warp) directory.
2020-04-02 09:58:49,259-15s radiome.execution.state INFO: Wiping out Computed(PythonJob(d7f6c670,calc_ants_warp),warped_image) directory.
2020-04-02 09:58:49,260-15s radiome.execution.state INFO: Wiping out PythonJob(d7f6c670,calc_ants_warp) directory.
2020-04-02 09:58:49,260-15s radiome.execution.state INFO: Wiping out Computed(PythonJob(d7f6c670,calc_ants_warp),warped_image) directory.
anibalsolon commented 4 years ago

Hi @puorc

It actually works for me, I pip installed the most recent versions of both repos and changed linear = False (I could see it spawned the Nanny from Dask, so it is working). You might want to check your installation since the error is definitely an environment problem (No module named 'radiome_workflow_ants')

Setting up a Travis test config would be helpful for these cases.

anibalsolon commented 4 years ago

Nevertheless, it needs a better handling for sure. I cannot reproduce it tho, will try to blindly handle it :D

puorc commented 4 years ago

It keeps happening on my computer even after I set up a new virtualenv and install these packages again. I found the problem is that importing modules using their file path at runtime might not sync to the worker processes correctly through deserialization. So I make some change to the import workflow functions according to some answers from stackoverflow and it looks good now.