ratt-ru / shadeMS

Rapid Measurement Set plotting with dask-ms and datashader
20 stars 6 forks source link

Invoke dask-ms properly #3

Closed IanHeywood closed 4 years ago

IanHeywood commented 4 years ago

This message appears when it's killed:

  File "/usr/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
IanHeywood commented 4 years ago

Tagging @sjperkins at @o-smirnov's request.

sjperkins commented 4 years ago

Thanks for the report @IanHeywood. Could you add more context around the issue. From your message it seems that it does get killed? Do more Ctrl^C's help?

Could you expand your description of the problem?

IanHeywood commented 4 years ago

Sorry that was a bit useless.

Here's the full output:

(shade) Singularity kern5-2019-07-10.simg:/ceph/pipelines/ianh/TKAT/flux_scale_tests/test> shadems --field=0 --xaxis=r --yaxis=i 1558743128_sdp_l0.full_1284.full_pol_GX_339-4_8SPW.ms
('r', 'Real', '')
('i', 'Imaginary', '')
 [09:27:59]  Plotting Imaginary vs Real
 [09:27:59]  Correlation index 0
 [09:27:59]
 [09:27:59]  FIELD_ID   NAME
 [09:27:59]  0          J1939-6342
 [09:27:59]
 [09:27:59]  SPW_ID     NCHAN
 [09:27:59]  0          128
 [09:27:59]  1          128
 [09:27:59]  2          128
 [09:27:59]  3          128
 [09:27:59]  4          128
 [09:27:59]  5          128
 [09:27:59]  6          128
 [09:27:59]  7          128
 [09:27:59]
 [09:27:59]  Reading 1558743128_sdp_l0.full_1284.full_pol_GX_339-4_8SPW.ms
 [09:27:59]  DATA column
 [09:28:00]  Rearranging the deck chairs

After sitting here for five minutes I hit Ctrl+C. I think this is justified as the pyrap version of this script takes under a minute for the same thing, and the older version of shadeMS (python 2.7 / xarray-ms version, not packaged) took a similar amount of time.

^CTraceback (most recent call last):
  File "/users/ianh/venv/shade/bin/shadems", line 8, in <module>
    main.main([a for a in sys.argv[1:]])
  File "/users/ianh/venv/shade/lib/python3.6/site-packages/ShadeMS/main.py", line 193, in main
    nchan = group.VISDATA.values.shape[1]
  File "/users/ianh/venv/shade/lib/python3.6/site-packages/xarray/core/dataarray.py", line 568, in values
    return self.variable.values
  File "/users/ianh/venv/shade/lib/python3.6/site-packages/xarray/core/variable.py", line 437, in values
    return _as_array_or_item(self._data)
  File "/users/ianh/venv/shade/lib/python3.6/site-packages/xarray/core/variable.py", line 250, in _as_array_or_item
    data = np.asarray(data)
  File "/users/ianh/venv/shade/lib/python3.6/site-packages/numpy/core/_asarray.py", line 85, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/users/ianh/venv/shade/lib/python3.6/site-packages/dask/array/core.py", line 1309, in __array__
    x = self.compute()
  File "/users/ianh/venv/shade/lib/python3.6/site-packages/dask/base.py", line 165, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/users/ianh/venv/shade/lib/python3.6/site-packages/dask/base.py", line 436, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/users/ianh/venv/shade/lib/python3.6/site-packages/dask/threaded.py", line 80, in get
    **kwargs
  File "/users/ianh/venv/shade/lib/python3.6/site-packages/dask/local.py", line 475, in get_async
    key, res_info, failed = queue_get(queue)
  File "/users/ianh/venv/shade/lib/python3.6/site-packages/dask/local.py", line 133, in queue_get
    return q.get()
  File "/usr/lib/python3.6/queue.py", line 164, in get
    self.not_empty.wait()
  File "/usr/lib/python3.6/threading.py", line 295, in wait
    waiter.acquire()

another Ctrl+C at this point gives me this:

KeyboardInterrupt
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python3.6/concurrent/futures/thread.py", line 40, in _python_exit
    t.join()
  File "/usr/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/util.py", line 262, in _run_finalizers
    finalizer()
  File "/usr/lib/python3.6/multiprocessing/util.py", line 186, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 582, in _terminate_pool
    worker_handler.join()
  File "/usr/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

then returns me to the terminal prompt.

sjperkins commented 4 years ago

How big is the MS?

Also, browsing through the shadems code base, I see lots of this:

https://github.com/IanHeywood/shadeMS/blob/dfb8b8330b8e91002c87cc742fc42c7ec3e3f397/ShadeMS/main.py#L211-L212

Specifically, the use of group.VISDATA.values will cause the underlying dask array to be evaluated into a numpy array. Even more specifically, it's going to load all visibility data at each use of the above, simply to get the number of rows and channels in the array.

The evaluation of the array can be elided by:

nrows = group.VISDATA.data.shape[0] 
nchan = group.VISDATA.data.shape[1] 

or even

nrows = group.VISDATA.shape[0] 
nchan = group.VISDATA.shape[1] 

as the data attribute returns the underlying dask or numpy array, while the values attribute always returns a numpy array.

This is not to be critical -- xarray and dask tends to make everything look a bit more easy and magical than it actually is and so its easy to be tripped up on these cases.

In an ideal dask workflow, one lazily constructs a dask compute graph from a series of expressions to produce (lets say) a couple of dask Arrays that represent the result. The graph is not evaluated until dask.compute is called on these result arrays. When this happens, data is loaded in chunks from the MS and transformed by algorithms to produce a final image.

shadems' use of group.VISDATA.values unfortunately means that the above does not happen and so much of the MS data is being converted to final numpy arrays far earlier and more often than is ideal. This also has the downside of using more memory and fewer cores. But it works! As a workaround for the moment why not convert everything to numpy arrays upfront with the following strategy:

datasets = [ds.compute() for ds in xds_from_ms("WSRT.MS", ...)]

I suspect that some sort of training/course is necessary here. A couple of RARG members have become proficient with dask and xarray, but it's easy to pick each other's brains when we're in the same office.

o-smirnov commented 4 years ago

I suspect that some sort of training/course is necessary here. A couple of RARG members have become proficient with dask and xarray, but it's easy to pick each other's brains when we're in the same office.

Excellent idea. Let's set up a half-day for the week before the bursary conference.

sjperkins commented 4 years ago

Excellent idea. Let's set up a half-day for the week before the bursary conference.

Yeah lets do that if it suits attendees. I've been making noises about it for a while. I feel it might need a half-week tho.

IanHeywood commented 4 years ago

How big is the MS?

Only 44 GB all-in, and I'm only selecting maybe 5% of it for this plot.

I suspect that some sort of training/course is necessary here.

Sounds good, thanks, I think you can go beyond suspicion on that one!

I basically just bungled through my implementation of this, so I appreciate the wisdom. I think it's probably what I was getting at in my email from the end of July:

I manipulate the xarray data with numpy and then put it into a Pandas dataframe for plotting by datashader. I'm not sure this is strictly necessary as datashader can also read xarrays, but I wasn't clued-up enough to get that to work, particularly for plotting channels instead of time where a corner-turn type operation is required.

I'm sure I had a better implementation to begin with but couldn't get it to work sensibly for plotting visibilities as a function of frequency. If it's sub-optimal but actually works then I'm likely to just run with it.

sjperkins commented 4 years ago

I basically just bungled through my implementation of this, so I appreciate the wisdom. I think it's probably what I was getting at in my email from the end of July:

Well I think I've been bungling with dask for two years, so yeah, it takes a while and I'm keen to spread the knowledge because the possibilities seem pretty powerful.

I manipulate the xarray data with numpy and then put it into a Pandas dataframe for plotting by datashader. I'm not sure this is strictly necessary as datashader can also read xarrays, but I wasn't clued-up enough to get that to work, particularly for plotting channels instead of time where a corner-turn type operation is required.

Yeah corner-turn operations are tricky (but doable) and often depends on assumptions on the layout of the underlying data in order to do them efficiently. It might be worth focusing on these kind of things in the proposed workshop -- the difficult cases are often the ones where we learn the most.

I'm sure I had a better implementation to begin with but couldn't get it to work sensibly for plotting visibilities as a function of frequency. If it's sub-optimal but actually works then I'm likely to just run with it.

Yeah go with what works for now. I think the proposed workaround should cut down on the time taken because it'll avoid loading visibilities all the time.

IanHeywood commented 4 years ago

Main body of code needs to be re-written to invoke dask-ms properly. Use @sjperkins examples from Cape Town session.