dask / dask-mpi

Deploy Dask using MPI4Py
BSD 3-Clause "New" or "Revised" License
52 stars 29 forks source link

Strange interaction with astropy.coordinates.Angle #76

Open AlecThomson opened 3 years ago

AlecThomson commented 3 years ago

What happened: This may be an issue in astropy, so my apologies if this is in the wrong location. Although, this appears to only happen using dask-mpi.

I'm using dask-mpi to distribute a task with using a astropy.coordinates.Angle object. When I try to convert to the astropy.units.hourangle format, I get the following error:

  File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/coordinates/angles.py", line 316, in to_string
    values = self.hour
  File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/quantity.py", line 861, in __getattr__
    raise AttributeError(
AttributeError: Angle instance has no attribute 'hour'

What you expected to happen: Using a LocalCluster and a SLURMcluster I do not get this error using otherwise identical code. Further, astropy.coordinate.Angle explicitly has an hour property (L161):

    @property
    def hour(self):
        """
        The angle's value in hours (read-only property).
        """
        return self.hourangle

Further, if I do (see MCVE):

print('hour' in dir(coord))

I get True! Something strange seems to be happening when I try to access the propety itself. I'll note something similar happened with coordinates.SkyCoord and it's hms property.

Minimal Complete Verifiable Example: This is as close to a minimal setup as my working script. Very frustratingly, the MCVE does not produce the same error. Hair pulling abounds.

from distributed import Client
from dask_mpi import initialize
from dask import delayed
from astropy.coordinates import Angle
import astropy.units as u

@delayed
def worker(i):
    coord = Angle(i*u.deg)
    print('coord.to_string()',coord.to_string())
    print('coord.to_string(u.hourangle)',coord.to_string(u.hourangle, sep=':', precision=3))
    return

def main():
    initialize(interface='ipogif0')
    client = Client()
    results = []
    for i in range(90):
        results.append(
            worker(i)
        )
    futures = client.persist(results)
    outputs = [f.compute() for f in futures]
if __name__ == "__main__":
    main()

Anything else we need to know?:

Environment:

jacobtomlinson commented 3 years ago

I would be surprised of something in dask-mpi was causing this directly. My first instinct would be to look at python environments. Specifically comparing what happens differently between using dask-mpi and SlurmCluster.

Perhaps a good first step would be to check the sys.executable on clusters submitted by both methods to ensure it is the same.

AlecThomson commented 3 years ago

Thanks, @jacobtomlinson. I added the line:

print('sys.exec', sys.executable)

to the worker function.

Using both dask-mpi and a LocalCluster I get:

sys.exec /group/askap/athomson/miniconda3/envs/spice/bin/python3.8

which is the conda env I'd expect to be called

ste616 commented 3 years ago

Your example code works perfectly for me, with astropy = 4.3.1, dask_mpi = 2.21.0, dask = 2021.08.0, Python = 3.8.10 and Ubuntu 20.04.2 LTS. I get both the degrees and hour strings out as expected.

I will note that the line numbers for the errors do not correspond to the version of astropy that I have installed.

AlecThomson commented 3 years ago

Thanks, @ste616. The same is true for me as well, actually. The MCVE also runs fine for me, which is part of my confusion. There might be some conflict between that and my full working script, but for the life of me I can't see what it is.

AlecThomson commented 3 years ago

EDIT: I've also found that the above problem (with u.hourangle) persists even when the function is not delayed (but still using dask-mpi)

As an update, it looks like the issue extends to other parts of astropy.units. A script like:

from distributed import Client
from dask_mpi import initialize
from dask import delayed
from astropy.coordinates import Angle
import astropy.units as u
import time
import numpy as np

@delayed
def worker(freq):
    freq_arr = freq.to(u.Hz).value
    return freq_arr

def main():
    initialize(interface='ipogif0')
    client = Client()
    results = []
    for i in range(100):
        freq = np.arange(100) * u.Hz
        results.append(
            worker(freq)
        )
    futures = client.persist(results)
    outputs = [f.compute() for f in futures]
    print('outputs is',outputs)
if __name__ == "__main__":
    main()

Raises astropy.units.core.UnitConversionError: 'Hz' (frequency) and 'Hz' (frequency) are not convertible. Again, I should note that this MCVE doesn't reproduce this Error which occurs in my full script.

I can workaround this by doing the unit conversion in main e.g.

from distributed import Client
from dask_mpi import initialize
from dask import delayed
from astropy.coordinates import Angle
import astropy.units as u
import time
import numpy as np

@delayed
def worker(freq):
    freq_arr = freq
    return freq_arr

def main():
    initialize(interface='ipogif0')
    client = Client()
    results = []
    for i in range(100):
        freq = np.arange(100) * u.Hz
        results.append(
            worker(freq.to(u.Hz).value)
        )
    futures = client.persist(results)
    outputs = [f.compute() for f in futures]
    print('outputs is',outputs)
if __name__ == "__main__":
    main()

The full traceback is:

Traceback...

``` Traceback (most recent call last): File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner new_state = method(self, state, *args, **kwargs) File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state value = prefect.utilities.executors.run_task_with_timeout( File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/prefect/utilities/executors.py", line 323, in run_task_with_timeout return task.run(*args, **kwargs) # type: ignore File "/group/askap/athomson/repos/spiceracs/spiceracs/processSPICE.py", line 64, in frion_task return frion.main( File "/group/askap/athomson/repos/spiceracs/spiceracs/frion.py", line 218, in main updates = [f.compute() for f in futures] File "/group/askap/athomson/repos/spiceracs/spiceracs/frion.py", line 218, in updates = [f.compute() for f in futures] File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/dask/base.py", line 285, in compute (result,) = compute(self, traverse=False, **kwargs) File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/dask/base.py", line 567, in compute results = schedule(dsk, keys, **kwargs) File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/client.py", line 2674, in get results = self.gather(packed, asynchronous=asynchronous, direct=direct) File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/client.py", line 1983, in gather return self.sync( File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/client.py", line 851, in sync return sync( File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/utils.py", line 354, in sync raise exc.with_traceback(tb) File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/utils.py", line 337, in f result[0] = yield future File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/tornado/gen.py", line 762, in run value = future.result() File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/client.py", line 1848, in _gather raise exception.with_traceback(traceback) File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/dask/utils.py", line 34, in apply return func(*args, **kwargs) File "/group/askap/athomson/repos/spiceracs/spiceracs/frion.py", line 80, in predict_worker freq_array=freq.to(u.Hz).value, File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/quantity.py", line 698, in to value = self._to_value(unit, equivalencies) File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/quantity.py", line 662, in _to_value return self.unit.to(unit, self.view(np.ndarray), File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/core.py", line 987, in to return self._get_converter(other, equivalencies=equivalencies)(value) File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/core.py", line 918, in _get_converter raise exc File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/core.py", line 903, in _get_converter return self._apply_equivalencies( File "/group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/astropy/units/core.py", line 886, in _apply_equivalencies raise UnitConversionError( astropy.units.core.UnitConversionError: 'Hz' (frequency) and 'Hz' (frequency) are not convertible ```

EDIT 2:

If I don't delay the function with the coordinate/hourangle issue, the same error occurs if I use LocalCluster. Delaying it allows it to work with LocalCluster. It fails in either case with dask-mpi.

jacobtomlinson commented 3 years ago

The inconsistency is strange. Are you definitely using 2021.05.0 everywhere?

AlecThomson commented 3 years ago

I'm pretty sure that's the case. I'm using a locally installed conda environment. As a test I added:

print("I'm in the {func} function!",'dask.__version__', dask.__version__)
print("I'm in the {func} function!",'dask.__file__', dask.__file__)

And I get:

I'm in the main function! dask.__version__ 2021.05.0
I'm in the main function! dask.__file__ /group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/dask/__init__.py
I'm in the worker function! dask.__version__ 2021.05.0
I'm in the worker function! dask.__file__ /group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/dask/__init__.py
jacobtomlinson commented 3 years ago

And distributed too? (They should be pinned but it's worth checking).

AlecThomson commented 3 years ago

Here's a test with distributed

I'm in the main function! distributed.__version__ 2021.05.0
I'm in the main function! distributed.__file__ /group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/__init__.py
I'm in the worker function! distributed.__version__ 2021.05.0
I'm in the worker function! distributed.__file__ /group/askap/athomson/miniconda3/envs/spice/lib/python3.8/site-packages/distributed/__init__.py

As small aside, I noticed I had an inconsistency in my module importing -- i.e.

import distributed

vs

from dask import distributed

I corrected to just use the former option, but the issue persists.

jacobtomlinson commented 3 years ago

I noticed I had an inconsistency in my module importing

The latter is preferred, but either will mostly be fine.

Again, I should note that this MCVE doesn't reproduce this Error which occurs in my full script.

Without a reproducer, I'm afraid this will be hard for us to track down.