DVC-Viking-Robotics / webapp

Flask webapp for interacting and remotely controlling the MASCCOT robot via WiFi.
MIT License
5 stars 4 forks source link

need to upgrade all use of threading module to multiprocessing module #37

Open 2bndy5 opened 4 years ago

2bndy5 commented 4 years ago

Just found out that the way I'm "joining" existing threads isn't a proper way "kill" them. The multiprocessing module seems to be an abstraction to aid this implementation. Need to do more research and testing though. I will start this escapade on the Drivetrain repo.

see this example of how to handle the raspi's camera using multiprocessing

tejashah88 commented 4 years ago

Here's some example code for using the multiprocessing module. It's for downloading a bunch of files in parallel.

import os
import urllib.request
import multiprocessing
import signal
import time

import numpy as np

def ensure_dir_exists(filename):
    dirname = os.path.dirname(filename)
    if not os.path.exists(dirname):
        try:
            os.makedirs(dirname)
        except OSError as exc: # Guard against race condition
            if exc.errno != errno.EEXIST:
                raise

def download_file(url, filename):
    # Create the containing directory if it doesn't exist
    ensure_dir_exists(filename)

    # Don't download the file if it already exists!
    if not os.path.exists(filename):
        urllib.request.urlretrieve(url, filename)

# Retrieves the contents of a file either locally or remotely
def get_contents_from_source(url, filename):
    download_file(url, filename)

    # Read the file and clean each of the lines for whitespacing
    with open(filename, 'r') as file:
        raw_contents = np.char.strip(file.readlines())

    return raw_contents

class ParallelFileDownloader:
    def __init__(self, num_processes = multiprocessing.cpu_count()):
        # The initializer argument tells the workers to ignore the 'Ctrl+C' signal,
        # to avoid raising any exceptions in the worker processes
        self.num_processes = num_processes
        self.pool = multiprocessing.Pool(
            processes = self.num_processes,
            initializer = lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)
        )

    def download_files(self, files, downloader_fn):
        # Generate a list of 2-tuple params for the process workers to parse
        task_params = list(enumerate(files))
        num_tasks = len(task_params)

        print('\nRetrieving {0} files with {1} processors...'.format(num_tasks, self.num_processes))

        tasks_remaining = len(task_params)
        tasks_in_progress = -1

        try:
            result = self.pool.map_async(downloader_fn, task_params, chunksize=1)

            # This loop just keeps a track of the number of tasks in progress and remaining
            while not result.ready():
                tasks_remaining = result._number_left
                tasks_in_progress = min(self.num_processes, tasks_remaining)
                time.sleep(0.1)

            tasks_remaining = result._number_left
            tasks_in_progress = min(self.num_processes, tasks_remaining)
        except KeyboardInterrupt:
            # If the user wants to cancel, gracefully shut down
            self.pool.terminate()

            # Join all the processes to properly clean up
            self.pool.join()

            # Raise the error again to acknowledge the user wanting to cancel the operation
            raise KeyboardInterrupt
        except ValueError:
            print('Pool is already closed! Cancelling current job...')

        # Properly print the right number of newlines after the progress bars are done printing
        print('\n' * (num_tasks - tasks_remaining + tasks_in_progress))

    # Closes the pool to prevent more tasks from being processed and join the existing processes
    def cleanup(self):
        self.pool.close()
        self.pool.join()