in03 / proxima

Transcode source media directly from DaVinci Resolve using multiple machines for encoding. Great for creating proxies quickly.
MIT License
50 stars 3 forks source link

Fix run_in_thread exception handler #226

Closed github-actions[bot] closed 1 year ago

github-actions[bot] commented 2 years ago

Fix run_in_thread exception handler

exception_handler=__pubsub_exception_handler,

https://github.com/in03/proxima/blob/80229d5c8e60091a61f027cc702a6965dac0da97/proxima/app/broker.py#L257


            self.status = switch[data["status"]]

    def update_task_info(self):

        self.parse_task_info_data()

        # Update last status in live view
        self.last_status.update(
            task_id=self.last_status_id,
            last_status=self.status,
        )

        # Update task info in live view
        self.progress.update(
            task_id=self.progress_id,
            active_workers=len(self.active_workers),
            completed_tasks=self.completed_tasks,
        )

        # Dealt with data, reset flag
        self.new_task_data_exists = False

    def update_progress_info(self):

        # Get up-to-date average
        try:

            self.total_average_progress = sum(self.prog_percentages.values()) / len(
                self.callable_tasks
            )

        except DivisionByZero:
            self.logger.debug(
                "[yellow]Encountered division by zero error! Setting progress to zero."
            )
            self.total_average_progress = 0.0

        # Update average progress
        self.logger.debug(
            f"[magenta]Total tasks average percentage: {self.total_average_progress}"
        )
        self.progress.update(
            task_id=self.progress_id, completed=self.total_average_progress
        )

        # Dealt with data, reset flag
        self.new_progress_data_exists = False

    def report_progress(self, results):

        self.group_id = results.id

        self.__define_progress_bars()
        self.__init_progress_bars()

        def __pubsub_exception_handler(ex, pubsub, thread):
            print(ex)
            thread.stop()
            thread.join(timeout=1.0)
            pubsub.close()

        # Subscribe to channels
        self.pubsub.psubscribe(**{"celery-task-meta*": self.task_event_handler})
        self.pubsub.psubscribe(**{"task-progress*": self.progress_event_handler})

        with Live(self.progress_group):

            # Run pubsub consumer in separate thread
            sub_thread = self.pubsub.run_in_thread(
                sleep_time=0.001,
                # TODO: Fix run_in_thread exception handler
                # labels: bug
                # exception_handler=__pubsub_exception_handler,
            )

            try:

                while not results.ready():

                    if self.new_task_data_exists:
                        self.update_task_info()

                    if self.new_progress_data_exists:
                        self.update_progress_info()

                    time.sleep(0.001)

            except:

                # re raise
                raise

            finally:

                # Close pubsub connection
                sub_thread.stop()
                sub_thread.join(timeout=1.0)
                self.pubsub.close()

            # Hide the progress bars after finish
            self.last_status.update(task_id=self.last_status_id, visible=False)
            self.progress.update(task_id=self.progress_id, visible=False)

        return results

86e741f078929dc3daefb59b8cce1efe72d81a7a

github-actions[bot] commented 1 year ago

Closed in 44b75c4384df04df0d6c5fbb78b35fe7091e6ac2