raspberrypi / picamera2

New libcamera based python library
BSD 2-Clause "Simplified" License
852 stars 181 forks source link

Added shared dma memory example #1046

Open kodonnell opened 4 months ago

kodonnell commented 4 months ago

As per #927 and @davidplowman 's request, this adds an example of how to use the picamera2 DMA heap between processes. I've done it as a benchmarking tool in the scenario of making your own framebuffer (as that's my use case - what's the fastest way to shuffle frames around?).

davidplowman commented 3 months ago

Thanks very much for this. After studying it for a bit, I actually I found myself wanting to make a more Picamera2-specific example, passing image buffers using Python multiprocessing (which also makes for convenient signalling between processes). You'd certainly taken care of all the tricky bits that I wouldn't have known about! Here's what I came up with (sorry it's a bit long, though the last bit is just an example of how you'd use it):

from collections import deque
from ctypes import CDLL, c_int, c_long, c_uint, get_errno
import numpy as np
from threading import Thread
import mmap
from multiprocessing import Process, Queue
import os

class Picamera2Proxy(Process):
    """A multi-processing Process that receives camera frames from Picamera2."""

    def __init__(self, picam2, name='main', *args, **kwargs):
        """Create a Picamera2 proxy process. Call after Picamera2 has been configured."""
        super().__init__(*args, **kwargs)
        self.config = picam2.camera_configuration()[name]
        self._stream = picam2.stream_map[name]
        self._picam2_pid = os.getpid()
        self._pid_fd = None
        self._send_queue = Queue()
        self._done_queue = Queue()
        self._requests_sent = deque()
        self._arrays = {}
        self._running = True
        self._first = True
        self._syscall = CDLL(None, use_errno=True).syscall
        self._syscall.argtypes = [c_long]
        self._thread = Thread(target=self._receive_done, args=())
        self._thread.start()
        self.start()

    def _receive_done(self):
        # Runs in a thread in the Picamera2 process to return requests to libcamera.
        while self._running or self._requests_sent:
            self._done_queue.get()  # requests are finished with in the order we sent them
            request = self._requests_sent.popleft()
            request.release()

    def send(self, request):
        """Call from the Picamera2 process to send an image from this request to the remote process."""
        plane = request.request.buffers[self._stream].planes[0]
        fd = plane.fd
        length = plane.length
        self._requests_sent.append(request)
        self._send_queue.put((fd, length))

    def _format_array(self, mem):
        # Format the memory buffer into a numpy image array.
        array = np.array(mem, copy=False, dtype=np.uint8)
        width, height = self.config['size']
        stride = self.config['stride']
        format = self.config['format']
        if format == 'YUV420':
            return array.reshape((height + height//2, stride))
        array = array.reshape((height, stride))
        if format in ('RGB888', 'BGR888'):
            return array[:, :width * 3].reshape((height, width, 3))
        elif format in ("XBGR8888", "XRGB8888"):
            return array[:, :width * 4].reshape((height, width, 4))
        return array

    def capture_array(self):
        """Call from the remote process to wait for an image array from the Picamera2 process."""
        # First tell the Picamera2 process that we're done with the previous image.
        if not self._first:
            self._done_queue.put("DONE")
        self._first = False
        # Wait for the next image. A "CLOSE" message means they're shutting us down.
        msg = self._send_queue.get()
        if msg == "CLOSE":
            return None
        # We have a new buffer. The message contains Picamera2's fd and the buffer length.
        target_fd, length = msg
        # Check if we've seen this buffer before.
        if target_fd in self._arrays:
            return self._arrays[target_fd]
        # Otherwise create a local fd, and mmap it to create a numpy image array.
        if self._pid_fd is None:
            self._pid_fd = os.pidfd_open(self._picam2_pid)
        # 438 is the magic number for calling pidfd_getfd.
        fd = self._syscall(438, c_int(self._pid_fd), c_int(target_fd), c_int(0))
        if fd == -1:
            errno = get_errno()
            raise OSError(errno, os.strerror(errno))
        mem = mmap.mmap(target_fd, length, mmap.MAP_SHARED, mmap.PROT_READ)
        array = self._format_array(mem)
        self._arrays[target_fd] = array
        return array

    def run(self):
        """Derived classes should override this to define what the remote process does."""
        pass

    def close(self):
        """Call from the Picamera2 process to close the remote process proxy."""
        self._running = False
        self._thread.join()
        self._send_queue.put("CLOSE")

if __name__ == "__main__":
    # Simple example showing how to use the Picamera2Proxy.
    from picamera2 import Picamera2
    import cv2

    class Proxy(Picamera2Proxy):
        def run(self):
            cv2.startWindowThread()
            while (array := self.capture_array()) is not None:
                cv2.imshow("Proxy", array)
                cv2.waitKey(1)

    picam2 = Picamera2()
    config = picam2.create_preview_configuration({'format': 'RGB888'})
    picam2.start(config)
    proxy = Proxy(picam2, 'main')  # send images from the "main" stream to the remote process

    for i in range(200):
        request = picam2.capture_request()
        proxy.send(request)

    proxy.close()

I'm starting to wonder a bit whether I should perhaps pass the entire request (all image buffers plus metadata) across, though perhaps that's more complicated than I really want.

kodonnell commented 3 months ago

Cool = ) Looks like you're copying directly from the request buffer to the proxy which is neat.

I guess the question becomes what to do with this. Why do we want remote calls? Well, it's generally nice and you can e.g. have multiple readers. But do we want a user-configurable larger buffer for just frame data (which is nice to handle delays etc. but not drop frames from the main camera loop)? Is this about performance or usability?

Me, two things things make sense:

FWIW for this PR I'd be tempted to keep the example as-is, as part of what I had to learn was how to use the picamera2 dma heap stuff for writing, so that might be useful to others. Likewise the benchmarking. I like your example though (as it shows how to read the buffers etc.) - up to you if it's a separate PR or not.

davidplowman commented 3 months ago

Hi again, in principle I'd be happy to merge this PR, I was just wondering if you'd be OK to take a look at the flake8 complaints from the CI tests. It's all syntax/formatting kind of stuff.

(flake8 seems to me to complain about a lot of annoying stuff, but we seem to be using it...)

kodonnell commented 2 months ago

Hi, sorry for delay - I've been working on production picamera2 deployments, and dealing with performance issues and what-not. Just a quick note - under load, I think the encoding is causing requests to be dropped. So I was thinking that we could just copy the relevant bit of the CMA memory that the encoder needs (which is only a small part of the whole request) and then release the request - this should be nice and fast, so we won't block the camera loop (and other consumers) even if the encoding starts to lag. We then feed the new (smaller) CMA copy to the encoder and those can be queued separately as needed. A nice side-effect is that we can lower memory consumption a fair bit too e.g. instead of having 6 (very large) request buffers full, they'll be largely free, and we'll just have smaller encoder buffers. Does this seem reasonable/useful/worthwhile?

Edit: not as part of this PR = ) Just a suggestion. I'll look at tidying up this PR at some point. Edit 2: oh, your example above basically shows how to do this already = )