Kitware / DIVA

DIVA project repository
Other
18 stars 9 forks source link

Building a "KwiverStream" to send/receive data through sprokit. #40

Open johnhenning opened 5 years ago

johnhenning commented 5 years ago

I'm trying to build a set of KwiverProcess subclasses that would be used across multiple components in the system. The idea would be to have the processes be the mode of communication between components that are defined outside of the KwiverProcess.

For example, I want to do something like the following:

class ObjectDetector:
    def __init__(self, input_stream, output_stream):
        self.input_stream = input_stream,
        self.output_stream
        self.model = mask_rcnn()

    def run(self):
        for frame in self.input_stream:
            detections = self.model(frame)
            for detection in detections:
                self.output_stream(detection)

class KwiverInputStream:
    def __init__(self, config, stream_type):
        self.config = config
        self.stream_type = stream_type
        self.kwiver_input_port = SomeKwiverInputPort(config)

    def __next__(self):
        return next(self.kwiver_input_port)

    def __iter__(self):
        for data in kwiver_input_port:
            yield deserialize_to_stream_type(data, self.stream_type)

class KwiverOutputStream:
    def __init__(self, config, stream_type):
        self.config = config
        self.stream_type = stream_type
        self.kwiver_output_port = SomeKwiverOutputPort(config)

    def send(self, data):
        serialized_data = serialize_data_to_kwiver(data, self.stream_type)
        self.kwiver_output_port.send(serialized_data)

def main():
    input_stream = KwiverInputStream(config, Frame)
    output_stream = KwiverOutputStream(config, ObjectDetection)
    detector = ObjectDetector(input_stream, output_stream)

    object_detector.run()

if __name__ == '__main__':
    main()

The idea here is that the process communication is decoupled from the process itself. So an input stream would be registered as the receiving end of some other process' output stream.

However, I'm currently unsure how to do this, as the control of when a process sends/receives new data is handled by Sprokit with the _step() method, rather than the process itself. Is there some way to invert this so the process would have more control of it? The only way I can see for this to work is to setup a queue from the kwiver process to the input_stream, but I'd rather get the data directly from the process.

as6520 commented 5 years ago

To clarify, you are trying to seperate the implementation of an algorithm from the kwiver process in the pipeline and want to use another process as glue to pass data between the two ?

johnhenning commented 5 years ago

Yes, I still want to be able to build a pipeline with the pipeline runner, but be able to separate the component from its communication method. This would allow for easier experimentation and testing so that the components can be used in isolation.

as6520 commented 5 years ago

I believe abstract algorithms in vital along with kwiver process would allow you to do it. The idea is you would create processes with abstract algorithms in them. These algorithm define the interfaces that you can override with your implementation. The implementation is called an arrow which could be either in python or in c++. You would then specify the implementation either in the pipeline or when you are running the pipeline and based on the specification, an object of the implementation is created and passed the data. Does this resolve your issue ?

johnhenning commented 5 years ago

How would this allow isolation of the process though?

as6520 commented 5 years ago

Do you mean isolation of your implementation or the kwiver process ?

johnhenning commented 5 years ago

If I wanted to do something like the code I wrote above, how would I get these isolated input and output streams?

as6520 commented 5 years ago

You can explicitly create an instance of the algorithm and specify the implementation and just pass it the data to test it without using sprokit/pipeline runner as show here.

Additionally this repo shows how to create an arrow in c++ or python and configure them in a pipeline

johnhenning commented 5 years ago

Is there a way to do this without using the abstract algorithms? It isn't clear how to extend them or create your own interfaces. I also want to be able to run these components independent of the framework, which this solution doesn't currently allow. Having the communication represented as streams would give more flexibility.

as6520 commented 5 years ago

You can create abstract algorithm to accepts datum and return datum and that would work with your own data types because that is what sprokit is providing you. This can be extended to a map of datum on either ends so that you can take the data on all ports and send the data to all ports.

The problem with a solution without abstract algorithm would be that you would have to ensure that those implementations available in the process and have a mechanism to select one of them. Abstract algorithm is the recommended mechanism in kwiver to do it. But since algorithm and processes don't depend on each other you can define your own approach to select the implementation.

I think the communication model you are describing is essentially what sprokit is providing you. The streams in your example are edges in sprokit that wait for the data and send the data once a process is done with it. The running mechanism is scheduler that is provided by sprokit.