ratt-ru / shadeMS

Rapid Measurement Set plotting with dask-ms and datashader
20 stars 6 forks source link

odd performance degradation when row chunks are set too small #29

Closed o-smirnov closed 4 years ago

o-smirnov commented 4 years ago

@sjperkins here's a puzzle for you. My default row chunk size is set to 100000, and everything was rosy. Here's a little test plot with a test MS.

(sms) oms@tshikovski:~/projects/shadems-testing$ rm *png; shadems 3C273-C-8424.ms/ --xaxis DATA:real --yaxis DATA:imag --corr all --xmin -40 --xmax 40 --ymin -40 --ymax 40 --color amp --cmin 0 --cmax 10 --row-chunk-size 100000 && eog *png
2020-04-18 23:28:24 - INFO     - ------------------------------------------------------
2020-04-18 23:28:24 - INFO     -                  : MS 3C273-C-8424.ms contains 393177 rows
2020-04-18 23:28:24 - INFO     -                  :   (1, 64) spectral windows and channels
2020-04-18 23:28:24 - INFO     -                  :   4 fields: J1331+3030 J1407+2827 J1224+0330 J1229+0203
2020-04-18 23:28:24 - INFO     -                  :   16 scans, first #1, last #16
2020-04-18 23:28:24 - INFO     -                  :   26 antennas: 0:1 1:2 2:3 3:4 4:5 5:6 6:7 7:8 8:9 9:10 10:11 11:12 12:13 13:14 14:15 15:16 16:17 17:18 18:19 19:20 20:21 21:22 22:23 23:24 24:25 26:27
2020-04-18 23:28:24 - INFO     -                  :   correlations RR RL LR LL
2020-04-18 23:28:24 - INFO     - ------------------------------------------------------
2020-04-18 23:28:24 - INFO     - Antenna(s)       : all
2020-04-18 23:28:24 - INFO     - Field(s)         : all
2020-04-18 23:28:24 - INFO     - SPW(s)           : all
2020-04-18 23:28:24 - INFO     - Scan(s)          : all
2020-04-18 23:28:24 - INFO     - ------------------------------------------------------
2020-04-18 23:28:24 - INFO     - defining new plot axis: real of DATA corr None, clip (-40.0, 40.0), discretization None
2020-04-18 23:28:24 - INFO     - defining new plot axis: imag of DATA corr None, clip (-40.0, 40.0), discretization None
2020-04-18 23:28:24 - INFO     - defining new plot axis: amp of DATA corr None, clip (0.0, 10.0), discretization 16
2020-04-18 23:28:24 - INFO     -                  : you have asked for 1 plots employing 3 unique datums
2020-04-18 23:28:24 - INFO     -                  : Indexing MS and building dataframes (chunk size is 100000)
2020-04-18 23:28:25 - INFO     -                  : complete
2020-04-18 23:28:25 - INFO     -                  : rendering 1 dataframes with 1.01e+08 points into 1 plot types
2020-04-18 23:28:25 - INFO     -                  : rendering plot-3C273-C-8424-DATA_RR_RL_LR_LL_imag_real-amp.png
2020-04-18 23:28:28 - INFO     -                  : shading using 16 colors (bin centres are 0.3125 0.9375 1.5625 2.1875 2.8125 3.4375 4.0625 4.6875 5.3125 5.9375 6.5625 7.1875 7.8125 8.4375 9.0625 9.6875)
2020-04-18 23:28:29 - INFO     -                  : wrote plot-3C273-C-8424-DATA_RR_RL_LR_LL_imag_real-amp.png
2020-04-18 23:28:29 - INFO     - Total time       : 4.75 seconds
2020-04-18 23:28:29 - INFO     - Finished
2020-04-18 23:28:29 - INFO     - ------------------------------------------------------

5 seconds, nice and smooth. Then in a fit of mischief, I set it to 1024. OK that's small and suboptimal, but it should still run, right? Except the process proceeded to chew up all my RAM slowly, then went boom:

(sms) oms@tshikovski:~/projects/shadems-testing$ rm *png; shadems 3C273-C-8424.ms/ --xaxis DATA:real --yaxis DATA:imag --corr all --xmin -40 --xmax 40 --ymin -40 --ymax 40 --color amp --cmin 0 --cmax 10 --row-chunk-size 1024 && eog *png
2020-04-18 23:29:42 - INFO     - ------------------------------------------------------
2020-04-18 23:29:42 - INFO     -                  : MS 3C273-C-8424.ms contains 393177 rows
2020-04-18 23:29:42 - INFO     -                  :   (1, 64) spectral windows and channels
2020-04-18 23:29:42 - INFO     -                  :   4 fields: J1331+3030 J1407+2827 J1224+0330 J1229+0203
2020-04-18 23:29:42 - INFO     -                  :   16 scans, first #1, last #16
2020-04-18 23:29:42 - INFO     -                  :   26 antennas: 0:1 1:2 2:3 3:4 4:5 5:6 6:7 7:8 8:9 9:10 10:11 11:12 12:13 13:14 14:15 15:16 16:17 17:18 18:19 19:20 20:21 21:22 22:23 23:24 24:25 26:27
2020-04-18 23:29:42 - INFO     -                  :   correlations RR RL LR LL
2020-04-18 23:29:42 - INFO     - ------------------------------------------------------
2020-04-18 23:29:42 - INFO     - Antenna(s)       : all
2020-04-18 23:29:42 - INFO     - Field(s)         : all
2020-04-18 23:29:42 - INFO     - SPW(s)           : all
2020-04-18 23:29:42 - INFO     - Scan(s)          : all
2020-04-18 23:29:42 - INFO     - ------------------------------------------------------
2020-04-18 23:29:42 - INFO     - defining new plot axis: real of DATA corr None, clip (-40.0, 40.0), discretization None
2020-04-18 23:29:42 - INFO     - defining new plot axis: imag of DATA corr None, clip (-40.0, 40.0), discretization None
2020-04-18 23:29:42 - INFO     - defining new plot axis: amp of DATA corr None, clip (0.0, 10.0), discretization 16
2020-04-18 23:29:42 - INFO     -                  : you have asked for 1 plots employing 3 unique datums
2020-04-18 23:29:42 - INFO     -                  : Indexing MS and building dataframes (chunk size is 1024)
2020-04-18 23:29:43 - INFO     -                  : complete
2020-04-18 23:29:43 - INFO     -                  : rendering 1 dataframes with 1.01e+08 points into 1 plot types
2020-04-18 23:29:43 - INFO     -                  : rendering plot-3C273-C-8424-DATA_RR_RL_LR_LL_imag_real-amp.png
Traceback (most recent call last):
  File "/home/oms/.venv/sms/bin/shadems", line 7, in <module>
    exec(compile(f.read(), __file__, 'exec'))
  File "/scratch/oms/projects/shadeMS/bin/shadems", line 8, in <module>
    main.main([a for a in sys.argv[1:]])
  File "/scratch/oms/projects/shadeMS/ShadeMS/main.py", line 521, in main
    render_single_plot(df, xdatum, ydatum, cdatum, pngname, title, xlabel, ylabel)
  File "/scratch/oms/projects/shadeMS/ShadeMS/main.py", line 484, in render_single_plot
    figx=xcanvas / 60, figy=ycanvas / 60):
  File "/scratch/oms/projects/shadeMS/ShadeMS/shadeMS.py", line 536, in create_plot
    raster = canvas.points(ddf, xaxis, yaxis, agg=count_integers(caxis, cdatum.nlevels))
  File "/scratch/oms/projects/datashader/datashader/core.py", line 224, in points
    return bypixel(source, self, glyph, agg)
  File "/scratch/oms/projects/datashader/datashader/core.py", line 1161, in bypixel
    return bypixel.pipeline(source, schema, canvas, glyph, agg)
  File "/scratch/oms/projects/datashader/datashader/utils.py", line 93, in __call__
    return lk[typ](head, *rest, **kwargs)
  File "/scratch/oms/projects/datashader/datashader/data_libraries/dask.py", line 30, in dask_pipeline
    return scheduler(dsk, name)
  File "/home/oms/.venv/sms/lib/python3.6/site-packages/dask/threaded.py", line 84, in get
    **kwargs
  File "/home/oms/.venv/sms/lib/python3.6/site-packages/dask/local.py", line 486, in get_async
    raise_exception(exc, tb)
  File "/home/oms/.venv/sms/lib/python3.6/site-packages/dask/local.py", line 316, in reraise
    raise exc
  File "/home/oms/.venv/sms/lib/python3.6/site-packages/dask/local.py", line 222, in execute_task
    result = _execute_task(task, data)
  File "/home/oms/.venv/sms/lib/python3.6/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/oms/.venv/sms/lib/python3.6/site-packages/dask/core.py", line 121, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/oms/.venv/sms/lib/python3.6/site-packages/dask/core.py", line 115, in _execute_task
    return [_execute_task(a, cache) for a in arg]
  File "/home/oms/.venv/sms/lib/python3.6/site-packages/dask/core.py", line 115, in <listcomp>
    return [_execute_task(a, cache) for a in arg]
  File "/home/oms/.venv/sms/lib/python3.6/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/scratch/oms/projects/datashader/datashader/compiler.py", line 147, in combine
    bases = tuple(np.stack(bs) for bs in zip(*base_tuples))
  File "/scratch/oms/projects/datashader/datashader/compiler.py", line 147, in <genexpr>
    bases = tuple(np.stack(bs) for bs in zip(*base_tuples))
  File "<__array_function__ internals>", line 6, in stack
  File "/home/oms/.venv/sms/lib/python3.6/site-packages/numpy/core/shape_base.py", line 433, in stack
    return _nx.concatenate(expanded_arrays, axis=axis, out=out)
  File "<__array_function__ internals>", line 6, in concatenate
MemoryError: Unable to allocate 106. GiB for an array with shape (1544, 900, 1280, 16) and data type int32
Error shutting down executor: 'NoneType' object is not callable

That allocation of a (1544, 900, 1280, 16) array is especially puzzling. The last three numbers are familiar -- the datashader canvas size is (900, 1280, 16) here. I don't know where the extra 1544 dimension comes from (and it's not a number I recognize!)

And when I go back to larger chunks, the problem goes away (it wouldn't be able to allocate an array that size on my puny desktop, so clearly it's not doing it when the script runs normally!) I really don't understand how chunk size can affect this logic, yet clearly it does...

o-smirnov commented 4 years ago

P.S. The reason I'm messing around with chunk size is because I'm trying to get a handle on RAM footprint with far larger (MeerKAT) datasets. These are at about 5e+9 points per plot (i.e. x50 larger than the test above). In this regime:

So, in summary: not an urgent problem (for me), but it is a nagging puzzle. The defaults are fine for my fat nodes, so I'm not touching them for now....People trying to plot MeerKAT data on smaller boxes will probably twist the chunk size knob and get slapped in the face by this.

sjperkins commented 4 years ago

I'd need to inspect the actual graph, but my working hypothesis is that datashader is creating an image for each input chunk in the MS and then trying to stack them all in one go. Hence when the input MS chunks are small, many images are created, leading to OutOfMemory errors when numpy tries to create the output for the stack operation.

  File "/scratch/oms/projects/datashader/datashader/compiler.py", line 147, in combine
    bases = tuple(np.stack(bs) for bs in zip(*base_tuples))
  File "/scratch/oms/projects/datashader/datashader/compiler.py", line 147, in <genexpr>
    bases = tuple(np.stack(bs) for bs in zip(*base_tuples))
  File "<__array_function__ internals>", line 6, in stack
  File "/home/oms/.venv/sms/lib/python3.6/site-packages/numpy/core/shape_base.py", line 433, in stack
    return _nx.concatenate(expanded_arrays, axis=axis, out=out)
  File "<__array_function__ internals>", line 6, in concatenate
MemoryError: Unable to allocate 106. GiB for an array with shape (1544, 900, 1280, 16) and data type int32
Error shutting down executor: 'NoneType' object is not callable

Here is the datashader code:

https://github.com/holoviz/datashader/blob/f003f58910a5a9e9c1733be82b3568a03e3e4e0b/datashader/data_libraries/dask.py#L63-L84

And here is my commented analysis of it:

   def chunk(df):
       """ Function applied per input chunk """
        aggs = create(shape)               # Create output array
        extend(aggs, df, st, bounds)    # Aggregate data into output array
        return aggs                              # Return output array

    name = tokenize(df.__dask_tokenize__(), canvas, glyph, summary)
    # Input graph keys, referencing input chunks
    keys = df.__dask_keys__()
    # Output graph keys
    keys2 = [(name, i) for i in range(len(keys))]
    # Map chunk function over input chunks (referred to by input keys) onto output keys
    dsk = dict((k2, (chunk, k)) for (k2, k) in zip(keys2, keys))
    # Call finalize(*[(combine, keys2)], **dict(...))
    # This calls combine on all output chunks (referenced by keys2)
    # which internally calls np.stack
    dsk[name] = (apply, finalize, [(combine, keys2)],
                 dict(cuda=cuda, coords=axis, dims=[glyph.y_label, glyph.x_label]))

Put another way, all images produced by each separate chunk node are parents of the combine node.

A tree reduction is probably the standard way to resolve this, but I'm not sure of what the implications of such a reduction would be on datashader's internal API. @jbednar, would it be possible for you to provide comment here and let us know whether this is something we should raise on the datashader repo?

@o-smirnov and @IanHeywood have blazed away to produce some beautiful Radio Astronomy plots that they may want to share. Our input data can be very large so they're interested in seeing how low they can set the memory budgets.

o-smirnov commented 4 years ago

Cool, thanks @sjperkins. That makes a lot of sense, and explains where the mysterious extra outer dimension comes from.

This also suggests there may be more efficient ways of implementing our particular reduction.

I'm going to fork the DS repo anyway, as I see a bug in colorize which I want to fix. We may as well play around with this while we're at it, right?

sjperkins commented 4 years ago

This also suggests there may be more efficient ways of implementing our particular reduction.

If the following is anything like the dask Array reduction function, it should construct a reduction tree from a dataframe.

https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.Series.reduction

jbednar commented 4 years ago

I'd need to inspect the actual graph, but my working hypothesis is that datashader is creating an image for each input chunk in the MS and then trying to stack them all in one go.

Yes, that's basically it, as long as you replace "image" with "rasterized array of values" (as it's not turned into an RGB image until later, after all such output arrays are combined).

Datashader's algorithms are designed for a particular situation that may not apply to what you want to do. Specifically, Datashader assumes that your chunk size is >> your output array size, which is typically a good assumption if you are rendering to a computer screen (and therefore have a relatively small total output array size) and if you choose a chunk size about as large as a worker node can handle (to minimize communication overhead). Under those conditions, it should have good performance.

But what if you choose a tiny chunk size, such that the output array >> your chunk size? In that case the output array size will dominate your memory needs both per worker and (as you apparently are seeing) overall (as you're then multiplying that output array size times a very large number of chunks).

I'm not sure what to suggest here, as I haven't studied your situation in detail, other than not to select a chunk size that's smaller than your output array size. Essentially, your output image resolution will determine your minimum viable worker-node memory capacity; you need to be able to hold a copy of the entire output array plus one chunk, on any worker.

Trying to extend Datashader to distribute the output array as well as the input array would only be practical if you have a spatially indexed data structure, because otherwise any given chunk needs to write to anywhere on the output array. Datashader already does support spatial indexing via spatialpandas, but (a) it currently only exploits that on the input side, not on the output, (b) building the spatial index itself then has this same problem and is very time consuming, (c) you can only have a spatial index for a single pair of coordinates, so you would be limited to only one type of plot per data structure, and (d) rendering to partial arrays on the output side would take a good bit of coding. Could be done, but not something I'd attempt myself!

jbednar commented 4 years ago

I'm going to fork the DS repo anyway, as I see a bug in colorize which I want to fix. We may as well play around with this while we're at it, right?

Sure! But please keep any PRs fixing colorize separate from any fixing the other above issues, as colorize fixes are likely to be simple and local, unlike trying to implement a distributed output array! :-)

jbednar commented 4 years ago

Looking back up at the comments above it sounds like you might have identified a bottleneck specifically in how the output arrays are being reduced, not in the fact that there is an output array per chunk? If so, then that might indeed be something feasible to implement, though it's very tricky to get it right across CPU and GPU cases.

sjperkins commented 4 years ago

@jbednar Thanks for your detailed response!

Datashader's algorithms are designed for a particular situation that may not apply to what you want to do.

Yes, we're rasterising Radio Astronomy data, which in its raw form is stored as complex data in the frequency domain. There's a lot of it (terabytes for MeerKAT and petabytes for the future SKA. For interest's sake, producing a RA image involves gridding these complex values and Fourier Transforming the grid to produce an image but this is a separate case from what we're trying to achieve with DataShader.

Here, @o-smirnov is producing plots to inspect the raw complex data (no gridding or FFT involved) which I suspect is highly categorical: a measurement is characterised by the TIME it was taken, the ANTENNA's that observed it, the FEED on each antenna, and the multiple CHANNELS/FREQUENCIES at which it was taken, amongst others. https://github.com/ska-sa/dask-ms#example-usage might give a bit of insight into what the data shapes look like.

But what if you choose a tiny chunk size, such that the output array >> your chunk size? In that case the output array size will dominate your memory needs both per worker and (as you apparently are seeing) overall (as you're then multiplying that output array size times a very large number of chunks).

The raw complex data has a magnitude of 1e12 -- 1e13 data points (for the moment), while the plots are far smaller (512**2 or 1024**2 @o-smirnov?). This is why we're trying to use ever smaller chunks for the raw data and running into the issue at hand.

Looking back up at the comments above it sounds like you might have identified a bottleneck specifically in how the output arrays are being reduced, not in the fact that there is an output array per chunk? If so, then that might indeed be something feasible to implement, though it's very tricky to get it right across CPU and GPU cases.

Yes, exactly:

script ```python import dask.array as da A = da.zeros(20, chunks=1) tree = A.sum(split_every=2) combine = da.blockwise(np.sum, (), A, ("x",), dtype=A.dtype) tree.visualize("tree.png") combine.visualize("combine.png") ```

Visually, I think datashader's current strategy is a combination strategy:

combine

Whereas in terms of optimal memory usage I am proposing a tree reduction strategy:

tree

I've personally had very good experiences with the tree reduction on our data in the dask CPU case (single node). I would expect the memory usage of the reduction to be O(T x I x F) where T is the number of threads, I is the image size and F is some fudge factor that in my experience doesn't exceed 2.0. By contrast the combination strategy would incur a memory usage of O(I x C) where C is the number of input data chunks.

In the distributed case, a tree reduction should also do well by reducing data movement. I can't speak from personal experience about how the dask scheduler handles things in the constrained memory environment of a GPU.

It's currently a bit unclear to me as to how a tree reduction would interact with the various datashader reduction operators and API's. ds.mean() would need to track counts throughout the reduction (I've coded this kind of thing myself), ds.any() or ds.all() would not and ds.mode() is very difficult in any parallel paradigm (but I suspect not important in the rendering case).

My question would be: are the datashader operators and API's specifically coded with the combination strategy in mind, or do you think they'd be able to handle the tree reduction strategy?

o-smirnov commented 4 years ago

Sure! But please keep any PRs fixing colorize separate from any fixing the other above issues, as colorize fixes are likely to be simple and local, unlike trying to implement a distributed output array! :-)

Sure @jbednar! It's basically this issue: https://github.com/holoviz/datashader/issues/899, but I'm now realizing it's a bit more broad than that. I've been trying to use the new by() reduction, which doesn't quite work, but I can also see it will break the colours when it does work... Anyway, I'll post more about that on your repo directly.

Regarding the performance issue, @sjperkins will try to implement a tree reduction on another branch, and we'll see how far that gets us.

Great job, and thanks, on the whole DS framework -- it's really doing great stuff for us:

image

image

o-smirnov commented 4 years ago

As another note to self to be tested later. This (1e+10 points):

shadems ms-4k-cal.ms -x U -y V -c ANTENNA1 --xmin -42000 --xmax 42000 --ymin -42000 --ymax 42000 -z 50000 --bmap rainbow

...blows my memory pas the 512GB on the node.

With a chunk size of 10000, it's chugging along, finishing in 168s. This is a very low-IO case, since it's only reading UWVs here.

sjperkins commented 4 years ago

With a chunk size of 10000, it's chugging along, finishing in 168s. This is a very low-IO case, since it's only reading UWVs here.

Note that if ANTENNA1 is in group_cols or the TAQL to select out rows there'll probably be strided disk access in a TIME ordered MS.

o-smirnov commented 4 years ago

No, it's in neither -- it's colouring by values of ANTENNA1, so the entire column is read eventually.

The problem is the nchunks x ncanvas allocation. Any luck putting a tree reduction in?

o-smirnov commented 4 years ago

I think this is solved by the tree reduction in our datashader fork. Let's try to get the fork merged...