ledatelescope / bifrost

A stream processing framework for high-throughput applications.
BSD 3-Clause "New" or "Revised" License
66 stars 29 forks source link

Best way to implement a 'sub-sequence' #83

Closed telegraphic closed 7 years ago

telegraphic commented 7 years ago

Say I have an array with dimensions:

[time, channel, fine_time, pol]

and I want to copy it over to the GPU one channel at a time, and act on a 'subsequence'. Is there a way to do this? Assuming I can do the book-keeping myself, is there a way to merge the frame axis (time) with the first axis in the frame (channel)? views.merge_axis doesn't seem to like this.

telegraphic commented 7 years ago

That is: can I merge [time, channel] into [timechan]?

benbarsdell commented 7 years ago

I think merge_axes should support this. What error are you seeing?

telegraphic commented 7 years ago

Huh, interestingly the error arises when adding in the print_header block:

    b_guppi   = blocks.read_guppi_raw(filelist, core=1, buffer_nframe=4)
    b_gup2   = views.merge_axes(b_guppi, 'time', 'freq', label='timechan')
    blocks.print_header(b_gup2)
    blocks.print_header(b_gup2)
    pipeline.run()

And error is:

dancpr@bldcpr:/bldata/pulsar_dev$ python bf_mergetest.py
Exception in thread Pipeline_0/PrintHeaderBlock_0:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "build/bdist.linux-x86_64/egg/bifrost/pipeline.py", line 293, in run
    self.main(active_orings)
  File "build/bdist.linux-x86_64/egg/bifrost/pipeline.py", line 426, in main
    self.sequence_proclogs[i].update(iseq.header)
  File "build/bdist.linux-x86_64/egg/bifrost/ring2.py", line 288, in header
    hdr = self.header_transform(deepcopy(hdr))
  File "build/bdist.linux-x86_64/egg/bifrost/views/basic_views.py", line 150, in header_transform
    tensor['gulp_nframe'] *= n
KeyError: 'gulp_nframe'
telegraphic commented 7 years ago

This also fails, presumably due to the same underlying reason:

with bfp.Pipeline() as pipeline:

    n_chunks = 2

    filelist = sorted(glob.glob('*.raw'))[:2]

    # Read from guppi raw file
    b_guppi   = blocks.read_guppi_raw(filelist, core=1, buffer_nframe=4)
    b_gup2    = views.rename_axis(b_guppi, 'freq', 'channel')

    # Buffer up two blocks & reshape to allow longer FFT
    b_gup2    = views.split_axis(b_gup2, axis='time', n=n_chunks, label='time_chunk')
    b_gup2    = blocks.transpose(b_gup2, axes=['time', 'channel', 'time_chunk', 'fine_time', 'pol'], buffer_nframe=1)
    b_gup2    = views.merge_axes(b_gup2, 'time_chunk', 'fine_time', label='fine_time')

    # Merge time-channel axis for smaller memcopies
    b_gup2    = views.merge_axes(b_gup2, 'time', 'channel', label='time')

    b_copy    = blocks.copy(b_gup2,  space='cuda', buffer_nframe=1, core=1)
    pipeline.run()

output

Exception in thread Pipeline_0/CopyBlock_0:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "build/bdist.linux-x86_64/egg/bifrost/pipeline.py", line 293, in run
    self.main(active_orings)
  File "build/bdist.linux-x86_64/egg/bifrost/pipeline.py", line 426, in main
    self.sequence_proclogs[i].update(iseq.header)
  File "build/bdist.linux-x86_64/egg/bifrost/ring2.py", line 288, in header
    hdr = self.header_transform(deepcopy(hdr))
  File "build/bdist.linux-x86_64/egg/bifrost/ring2.py", line 60, in <lambda>
    return lambda x: f(g(x))
  File "build/bdist.linux-x86_64/egg/bifrost/views/basic_views.py", line 150, in header_transform
    tensor['gulp_nframe'] *= n
KeyError: 'gulp_nframe'
telegraphic commented 7 years ago

Ok, I modified the guppi reader to have a gulp_nframe in its tensor, and then modified the def merge_axes(block, axis1, axis2, label=None, ignore_units=False) to have ignore_units, otherwise it complains. So I can get past this, but then get back to the issue #82 .

benbarsdell commented 7 years ago

Ignoring units may be dangerous; what problem do you get?

telegraphic commented 7 years ago

It's trying to combine 's' with 'MHz' and complaining (raising a ValueError, and scale factors complain too). It is after all nonsensical...

benbarsdell commented 7 years ago

Oh I didn't notice the comment explaining what you were trying to achieve. Yeah you shouldn't really do that :)

You can reduce memory usage by putting some blocks inside a with bf.block_scope(fuse=True):

telegraphic commented 7 years ago

Thanks, that gets me closer! I do remember you talking about block fusion but didn't realize it was implemented.

benbarsdell commented 7 years ago

Contrary to appearances, it's not doing proper fusion. It just sets the buffer_factor to 1 (which you can also do manually) and enables workspace sharing for any blocks that use workspace. The result is that the "fused" blocks run sequentially, but memory use is significantly reduced.

Putting all of the GPU compute blocks inside a fused scope is often a good approach, because having them run in parallel with each other doesn't usually gain you much anyway.