charmplusplus / charm4py

Parallel Programming with Python and Charm++
https://charm4py.readthedocs.io
Apache License 2.0
290 stars 21 forks source link

Passing futures to Arrays on different PE's doesn't work #194

Closed martinkoenig closed 3 years ago

martinkoenig commented 3 years ago

I'm facing an issue with passing future objects to Arrays. Let's consider the following example. Two USB cameras, one Stream wrapper, one data processor

  from charm4py import charm, Chare, Array, Reducer, Future, coro

  # Camera Class
  class Camera(Chare):
      def read(self, f):
          data = "image"
          self.reduce(f, data, Reducer.gather)

  # Wrapper for multiple cameras
  class Stream(Chare):
      def __init__(self):
          self.worker = Array(Camera, 2)

      @coro
      def get_images(self, f):
          self.worker.read(f)

  # Do some processing on images
  class Processor(Chare):
      @coro
      def process(self, future):
          images = future.get()
          result = "processing_result"
          return result

  def main(args):
      stream = Chare(Stream, onPE=-1)
      processor = Chare(Processor, onPE=-1)

      while True:
          f1 = Future()
          stream.get_images(f1)
          result = processor.process(f1, awaitable=True).get()

  charm.start(main)

If stream and processor get initialized as straight Chare's, everything run's fine But if one of them get's initialized as an Array, like so: processor = Array(Processor, 1, args=[stream]) it stops working. I get following error message:

------------- Processor 0 Exiting: Called CmiAbort ------------ Reason: AttributeError: 'Future' object has no attribute 'gotvalues'

Is this behavior normal/expected? And for some reason, this code is utilizing all 8 cores at 100%... Am I doing something wrong?

Ubuntu 16.04 Virtual Machine Python: 3.6.12 Charm4Py: 1.0

martinkoenig commented 3 years ago

Oh wait...it seems that it's not possible to pass futures between processes running on different PE's. This breaks too, but why?

  from charm4py import charm, Chare, Array, Reducer, Future, coro

  # Camera Class
  class Camera(Chare):
      def read(self, f):
          data = "image"
          self.reduce(f, data, Reducer.gather)

  # Wrapper for multiple cameras
  class Stream(Chare):
      @coro
      def get_images(self, f):
          self.worker.read(f)

  # Do some processing on images
  class Processor(Chare):
      def __init__(self, stream):
          self.stream = stream

      @coro
      def process(self, future):
          images = future.get()
          result = "processing_result"
          return result

  def main(args):
      stream = Chare(Stream, onPE=0)
      processor = Chare(Processor, onPE=1)

      while True:
          f1 = Future()
          stream.get_images(f1)
          result = processor.process(f1, awaitable=True).get()

  charm.start(main)
ZwFink commented 3 years ago

This looks like an undocumented nuance with the way futures are currently implemented. Currently, futures are "owned" by the PE on which they are created, and there is no way to transfer ownership. A Future's result has to be retrieved on the same PE on which it was created. This will be added to the documentation. One way to fix this is to provide a different callback to stream.get_images.

from charm4py import charm, Chare, Array, Reducer, Future, coro

# Camera Class
class Camera(Chare):
    def read(self, f):
        data = "image"
        self.reduce(f, data, Reducer.gather)

# Wrapper for multiple cameras
class Stream(Chare):
    def __init__(self):
        self.worker = Array(Camera, 2)

    @coro
    def get_images(self, cb):
        self.worker.read(cb)

# Do some processing on images
class Processor(Chare):
    def __init__(self, result_future):
        self.result_future = result_future

    @coro
    def process(self, data):
        print(data)
        result = "processing_result"
        self.result_future(result)

def main(args):
    f1 = Future()

    stream = Chare(Stream)
    processor = Chare(Processor, args=[f1])

    stream.get_images(processor.process)
    print(f1.get())
    charm.exit()

charm.start(main)

An additional complexity arises from a Charm++ restriction that Arrays can only be created from PE 0. This can be solved simply by stream = Chare(Stream, onPE=0) to ensure that the stream chare is placed on PE 0. If this is not desirable, an alternative can be used:

from charm4py import charm, Chare, Array, Reducer, Future, coro

# Camera Class
class Camera(Chare):
    def read(self, f):
        data = "image"
        self.reduce(f, data, Reducer.gather)

# Wrapper for multiple cameras
class Stream(Chare):
    @coro
    def __init__(self):
        # self.worker = Array(Camera, 2)
        self.worker = None
        self.creation_future = charm.thisProxy[0].createArray(Camera, 2, ret=True)

    @coro
    def get_images(self, cb):
        self.worker = self.creation_future.get()
        self.worker.read(cb)

# Do some processing on images
class Processor(Chare):
    def __init__(self, result_future):
        self.result_future = result_future

    @coro
    def process(self, data):
        print(data)
        result = "processing_result"
        self.result_future(result)

def main(args):
    f1 = Future()

    stream = Chare(Stream, onPE=1)
    processor = Chare(Processor, args=[f1])

    stream.get_images(processor.process)
    print(f1.get())
    charm.exit()

charm.start(main)

Is this compatible with your use case? I will that the second example currently does not work due to a bug in Charm4Py. I am currently testing the fix and merge it later today. Once it has been merged I will notify you.

martinkoenig commented 3 years ago

This looks like an undocumented nuance with the way futures are currently implemented. Currently, futures are "owned" by the PE on which they are created, and there is no way to transfer ownership. A Future's result has to be retrieved on the same PE on which it was created.

Ahh, ok. Thank you very much! So this is kinda expected, no wonder it didn't work ^^

Is this compatible with your use case? I will that the second example currently does not work due to a bug in Charm4Py. I am currently testing the fix and merge it later today. Once it has been merged I will notify you.

Thanks, I will wait for the new merge! It's not fully compatible with my use case. The code I provided, is the most basic and stripped down version to reproduce this "issue". My use case is quite a bite more complex. I need to run this infinitely and with multiple instances. But nonetheless, your modified example helps a lot, thank you!

My Use Case: 5 Stream instances with 3 cameras each. So I have 15 cameras, grouped into 5 Stream Wrappers. Each stream wrapper instance, needs to take synchronized pictures from all of it's 3 cameras at the same time and then pass it's images through several different processing steps. This means I have 5 pipelines that need to run in parallel but async/independent from another.

I will try to find a way to accomplish this. I think your modified example will help me to figure this out. Or maybe Charm4Py isn't the right tool for this kind of task?

ZwFink commented 3 years ago

This task looks well-suited to Charm4py. How synchronized should these pictures be? Are there phases in the computation where picture-taking and processing happen independently, or are they to be interleaved? Some issues may arise depending on the characteristics of the code.

martinkoenig commented 3 years ago

It isn't necessary that picture-taking and processing are running independently, they can run one after another.

The pictures need to be synchronized up to a few milliseconds. So they should be taken/received roughly at the same time. I'm doing motion analysis, so they need to match up for accurate processing and calculation. I'm doing motion analysis for multiple clients with multiple cameras each.

This code works so far and shows roughly what I'm trying to achieve. Now, I need to run all of this n-times independently in parallel. Basically everything in main() needs to be instantiated for every client. Every client/instance is independent from the others and is doing his thing. So client 1 cannot wait for client 3 to finish. But they need to run in parallel

Do you have some tiny little advice on how to achieve this? Or is this bad practice/non-sense what I'm doing here?

from charm4py import charm, Chare, Array, Reducer, Future, coro

# Camera Class
class Camera(Chare):
    def read(self, f, main_callback):
        data = "image"
        self.reduce(f, [data, main_callback], Reducer.gather)
        # self.reduce(f, data, Reducer.gather)

# Wrapper for multiple cameras
class Stream(Chare):
    def __init__(self):
        self.worker = Array(Camera, 3)

    @coro
    def get_images(self, cb, main_callback):
        self.worker.read(cb, main_callback)

# Do some processing on images
class Processor(Chare):
    def __init__(self, tracker, diff):
        self.tracker = tracker
        self.diff = diff

    @coro
    def process(self, data):
        callback = data[0][1]

        # Do image processing
        f1 = Future()
        self.tracker.track(data, f1)

        # Further image processing
        f2 = Future()
        self.diff.run(data, f2)

        result1 = f1.get()
        result2 = f2.get()

        # Main Callback
        callback([result1, result2])

# Class with image processing functions
class Tracker(Chare):
    def __init__(self):
        self.tracking = "tracking"

    def track(self, data, f):
        image = data[self.thisIndex[0]][0]
        result = [1, 2, 3]
        self.reduce(f, result, Reducer.gather)

# Class with other image processing functions
class Difference(Chare):
    def __init__(self):
        self.diff = "image_difference"

    def run(self, data, f):
        image = data[self.thisIndex[0]][0]
        result = [4, 5, 6]
        self.reduce(f, result, Reducer.gather)

def main(args):
    num_cameras_per_client = 3

    stream = Chare(Stream)
    tracker = Array(Tracker, num_cameras_per_client)
    diff = Array(Difference, num_cameras_per_client)
    processor = Chare(Processor, args=[tracker, diff])

    while True:
        main_callback = Future()
        stream.get_images(processor.process, main_callback)
        print(main_callback.get())
    charm.exit()

charm.start(main)
ZwFink commented 3 years ago

One possibility is to create a Controller chare that can handle the operations for each client. Then you can do something like:

class ClientMap(ArrayMap):
    def procNum(self, index):
        return 0
...
myMap = Group(ClientMap)
controller = Array(Controller, args=[num_clients, num_cameras_per_client], map=myMap)
controller.run_forever()

I have used the ArrayMap to ensure the chares in the array are placed on PE 0 so they may create other chare arrays without issue. Alternatively you can checkout and build from the bug-future_different_coro branch, which fixes the bug this issue uncovered and is under review, and use the createArray method in the way outlined above. Here I have assumed the Controller constructor will create the stream, tracker, diff, and processor chares for each client. The function run_forever would contain the while loop.

One possibility is that the processing chares from client n are on the same PE as the camera chares from client n+, and that may create delays between pictures if the processing of one client happens between picture taking of another. One solution to this is to use an ArrayMap and the onPE argument to ensure that all of a client's chares are on the same processor, ensuring we do not see the interference described above. You could do it at a finer grain by assigning the camera chares of one or more clients to a PE and allowing the processing of the images to happen on the other processors. You can decide stylistically or empirically which will be better.

martinkoenig commented 3 years ago

I will try out your suggestions tomorrow, I think this will solve my problems. Thank you very much! You are awesome! :1st_place_medal:

martinkoenig commented 3 years ago

It works! Now I'm able to run as many clients with as many cameras as I want. It's exactly how I wanted it to be. You made my day! :)