ledatelescope / bifrost

A stream processing framework for high-throughput applications.
BSD 3-Clause "New" or "Revised" License
66 stars 29 forks source link

Clarification of MultiTransformBlock usage #124

Open telegraphic opened 5 years ago

telegraphic commented 5 years ago

Hi all, I'm experimenting with a pipeline that joins two input streams (blocks) together, and was hoping for some clarification on how to implement it. Firstly, what should happens on the on_sequence event if there are N input streams, and M output streams? Is it as simple as parsing the N input headers, and returning a list of M output headers? Similarly, the on_data method should return a list with M entries, each specifying the number of frames per stream?

Here is my attempt, using numpy to concantenate two inputs. Is there a suggested way to implement this on the GPU, perhaps using bf.map?

class StitchBlock(bf.pipeline.MultiTransformBlock):
    """ Concatenate two rings along axis """
    def __init__(self, irings, axis=-1, *args, **kwargs):
        super(StitchBlock, self).__init__(irings, *args, **kwargs)
        self.axis = axis

    def on_sequence(self, iseqs):
        ohdr = deepcopy(iseqs[0].header)
        if isinstance(self.axis, str):
            self.axis = ohdr["_tensor"]["labels"].index(self.axis)
        ohdr["_tensor"]["shape"][self.axis] *= 2
        return [ohdr,]

    def on_data(self, ispans, ospans):
        out_nframe = ispans[0].nframe
        d0 = ispans[0].data
        d1 = ispans[1].data
        d  = np.concatenate((d0, d1), axis=self.axis)

        odata = ospans[0].data
        odata[...] = d
        return [out_nframe,]

Cheers, Danny

jaycedowell commented 5 years ago

Out of curiosity, what is the use case you are thinking about for this block?

telegraphic commented 5 years ago

The plan is to recombine F-engine data that arrives on two different UDP ports. So input tensors are ['time', 'ant', 'channel', 'pol'], and concatenating on channel axis.

Our capture code is actually PSRDADA-based; I'm using the bifrost bindings to attach to existing ring buffer capture code. It's working! (I'll add some details on how to do this to the bifrost documentation at some stage)

jaycedowell commented 5 years ago

That's cool. Were you also able to get this running on the GPU with bf.map() or are you still on the CPU with numpy.concatenate()?

telegraphic commented 5 years ago

It's working with numpy.concatenate; haven't made a gpu version (yet)