raspberrypi / picamera2

New libcamera based python library
BSD 2-Clause "Simplified" License
889 stars 188 forks source link

[HOW-TO] Integrate capture callbacks with asyncio #714

Open simon-staal opened 1 year ago

simon-staal commented 1 year ago

I have an image detection pipeline running on a Pi which communicates to a server via an asyncio TCP socket. In order to ensure that messages from the server are processed properly, I'm using the non-blocking form of picam2.capture_array() by specifying a signal_function containing a callback. Once receiving the image, I'm performing the actual processing in a separate process to ensure this doesn't block image capture either.

Ideally, I'd like to do the following (with the global executor being replaced by a functools.partial application):

async def on_complete(job):
    global executor
    img = job.get_result()
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(executor, img_processing, img) # performs image processing in seperate process
    await on_result(result) # sends data to server

async def image_detection_task():
    global executor
    picam2 = setup_camera() # configures picam2 object
    executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
    while True:
        job = picam2.capture_array(signal_function=on_complete)
        await job
    executor.stop()

Currently, I've considered the following work-around:

def on_complete(job):
    global is_job_complete, img
    img = job.get_result()
    is_job_complete = True

async def image_detection_task():
    global img, is_job_complete
    picam2 = setup_camera() # configures picam2 object
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        while True:
            is_job_complete = False
            job = picam2.capture_array(signal_function=on_complete)
            while not is_job_complete:
                await asyncio.sleep(0.100)
            loop = asyncio.get_event_loop()
            data = await loop.run_in_executor(executor, img_processing, img) # performs image processing in seperate process
            await send_data(data) # sends data to server

This seems to work, but isn't ideal. Additionally, the on_complete function can interrupt tasks even when they're not being awaited, which is also not optimal. Is there any way I can achieve what I want in a way similar to the desired implementation?

davidplowman commented 1 year ago

Hi, thanks for the question. I think I might need a little more explanation to understand what's going on here...

Referring to your original example, you want to capture an image, and then pass this to on_complete(). This function schedules the event loop to do the image processing (loop.run_in_executor()) and then also waits for that to complete (the await on that same line). Finally the results are sent to the server (on_result()) and you wait for that to complete too (the await on that line again). Have I got that right?

I'm wondering whether we're doing a lot of waiting directly in the camera thread there (which is where the signal_function runs), which will block camera activity completely. Might this even prevent parallelism altogether?

In the second example, I don't really see any benefit in calling picam2.capture_array() asynchronously. I think img = picam2.capture_array() might be more straightforward, and you don't need the while not is_job_complete: loop (again, if I've interpreted that correctly).

Thereafter, is it again waiting for the image processing to complete and for the data to be sent, before going round the loop again and capturing the next image? Again, my apologies if I haven't understand this correctly.

simon-staal commented 1 year ago

Hi David, happy to try and provide more context. You are correct that in both cases we want to wait for image processing to be complete and for data to be sent before proceeding into the next iteration of the loop. The reason why I'm looking to call picam2 asynchronously is that I'd like the image_detection_task to block execution as little as possible, as it will be running in parallel with the client managing the connection with the server (also using asyncio). I'm concerned that the blocking picam2.capture_array() (which takes ~100ms to execute as I'm taking high resolution images) may degrade the performance of the client which is handling the connection, as it can't be scheduled in to handle incoming messages from the server. Since tasks in asyncio can only be scheduled in when the currently run function awaits, any responses to the server would be stuck until the camera finishes taking a picture (from my understanding), which seems less than ideal.

I was not aware that signal_function runs in the camera thread, separate from the main thread, you're totally right that it doesn't make sense to run tasks in there. I guess a correction on my request above would be an asynchronous version of picam2.capture_array() which can be awaited until the img is ready, like so:

async def image_detection_task():
    picam2 = setup_camera() # configures picam2 object
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        while True:
            img = await picam2.capture_array() # Releases asyncio to process other tasks while camera is taking image
            loop = asyncio.get_event_loop()
            data = await loop.run_in_executor(executor, img_processing, img) # performs image processing in seperate process
            await send_data(data) # sends data to server

Hopefully this makes a bit more sense?

davidplowman commented 1 year ago

Hi, thanks for the update. That certainly makes more sense, though there's probably still some stuff maybe to think about.

Firstly, this line:

            img = await picam2.capture_array() # Releases asyncio to process other tasks while camera is taking image

I don't really understand how this is different from

            img = picam2.capture_array()

In both cases, the copying of the camera memory into your numpy array happens in the camera thread, and then the calling thread is signalled. So I think these are basically doing the same thing, though the first version is probably poking more threads and doing some extra signalling? (There are ways to do the copy not-in-the-camera-thread, but maybe let's make something simple work first!)

Next we have

            data = await loop.run_in_executor(executor, img_processing, img) # performs image processing in seperate process

Am I right in thinking that this is waiting for the image processing to finish? If so, it seems to me like that might be breaking the parallelism, as we wait for every task to finish as soon as we create it? Or is there some kind of parallelism going on inside the img_processing function, though it's not clear to me how that would help.

My assumption is that you want to run several img_processing operations in parallel on consecutive frames, and as each one finishes (in the right order), then you send the results to the server? Is that an accurate description? Thanks!

simon-staal commented 1 year ago

Essentially, the (heavily simplified) code running on the Pi will look something like this:

class SensorClient:
    def connect(self):
        # Connect to server
        # Do some other admin work
        # Run server communication task in parallel with image detection
        self.tasks = asyncio.gather(*[image_detection_task(), server_communication_task()])

So the use of await in the image_detection_task is not to make that function itself work in parallel, but to allow it to work in parallel with other tasks running on the Pi (in the same process). My concern is that without the use of await for picam2.capture_array(), although the actual processing is happening in a seperate thread, the asyncio event loop is not signalled that it may schedule in another task as it is waiting to complete, keeping execution stuck in the image_detection_task(), which causes real-time tasks (server communication) to stall. This is why I was using the workaround of the non-blocking version of capture_array(), which then brings us to our asyncio.sleep loop where other async tasks can be run while we wait for the camera to finish taking a picture. However, given that the signal function appears to be executed as soon as the image is ready, and it's unclear to me how the camera thread operates with the GIL, this function may also interrupt execution of code in a different task at an unexpected time (i.e. not during an await statement), which might lead to subtle bugs. So essentially, my 2 main questions are:

davidplowman commented 1 year ago

Hi again, I'm afraid these are all things I've never tried for myself so it's difficult to give you definitive answers. Maybe the best thing is to instrument the code to check that you're getting the behaviour that you want.

Nonetheless, I can try and explain the behaviour of the camera system. I think it's pretty straightforward so it ought to be possible to find out how it interacts with other things.

When you're running in a script (rather than a Qt application), there's a background camera thread. When you do capture_array(), it blocks on a socket which becomes readable when a camera frame is available. The camera thread then copies the frame for you and signals your script thread. I assume that Python, while one thread is waiting on a socket, is quite happy to go and run other threads.

If you prefer not to let the camera thread the image copy, you can use this idiom.

request = picam2.capture_request()
image = request.make_array('main')
request.release()

You could even do capture_request() in your main thread, and then send the request off to other threads for copying (and don't forget to release the request after), image processing and so on.

There's nothing to stop you even calling capture_array() or capture_request() from different threads, though of course at that point you lose track of the order in which frames arrived. It was never quite clear to me whether the data you send to your server needs to happen in the order in which the frames arrived or not.

Sorry not to be more help with this, but things are getting a little abstract for me to be certain of what I'm saying. If you had some simple self-contained test cases that show particular behaviour then I'd be happy to try and run them.