nv-morpheus / Morpheus

Morpheus SDK
Apache License 2.0
309 stars 119 forks source link

[BUG]: test_dfencoder_distributed_e2e fails with newer versions of pytorch #1770

Open cwharris opened 6 days ago

cwharris commented 6 days ago

Version

24.06

Which installation method(s) does this occur on?

No response

Describe the bug.

morpheus/tests/dfencoder/test_dfencoder_distributed_e2e.py fails with newer versions of pytorch for multiple reasons:

First, there have been API changes to multiprocess spawning in pytorch, and our wrapper in morpheus/models/dfencoder/multiprocessing.py needs to be updated to match those changes.

Second, once the wrapper has been updated, the test fails with:

>       raise ProcessRaisedException(msg, error_index, failed_process.pid)
E       torch.multiprocessing.spawn.ProcessRaisedException: 
E       
E       -- Process 0 terminated with the following error:
E       Traceback (most recent call last):
E         File "/usr/local/lib/python3.10/dist-packages/morpheus/models/dfencoder/multiprocessing.py", line 34, in _wrap
E           fn(i, *args)
E         File "/root/morpheus/tests/dfencoder/test_dfencoder_distributed_e2e.py", line 187, in _run_test
E           assert min(losses) < LOSS_TARGETS[loss_type][feature] * LOSS_TOLERANCE_RATIO
E       AssertionError

/usr/local/lib/python3.10/dist-packages/torch/multiprocessing/spawn.py:188: ProcessRaisedException

Minimum reproducible example

No response

Relevant log output

Click here to see error details

================================================= FAILURES =================================================
______________________________________ test_dfencoder_distributed_e2e ______________________________________

    @pytest.mark.slow
    def test_dfencoder_distributed_e2e():
        world_size = 1

>       start_processes(_run_test, args=(world_size, ), nprocs=world_size, join=True)

tests/dfencoder/test_dfencoder_distributed_e2e.py:113: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/local/lib/python3.10/dist-packages/morpheus/models/dfencoder/multiprocessing.py:80: in start_processes
    while not context.join():
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = , timeout = 30

    def join(self, timeout=None):
        r"""Join one or more processes within spawn context.

        Attempt to join one or more processes in this spawn context.
        If one of them exited with a non-zero exit status, this function
        kills the remaining processes and raises an exception with the cause
        of the first process exiting.

        Returns ``True`` if all processes have been joined successfully,
        ``False`` if there are more processes that need to be joined.

        Args:
            timeout (float): Wait this long before giving up on waiting.
        """
        # Ensure this function can be called even when we're done.
        if len(self.sentinels) == 0:
            return True

        # Wait for any process to fail or all of them to succeed.
        ready = multiprocessing.connection.wait(
            self.sentinels.keys(),
            timeout=timeout,
        )

        error_index = None
        for sentinel in ready:
            index = self.sentinels.pop(sentinel)
            process = self.processes[index]
            process.join()
            if process.exitcode != 0:
                error_index = index
                break

        # Return if there was no error.
        if error_index is None:
            # Return whether or not all processes have been joined.
            return len(self.sentinels) == 0

        # Assume failure. Terminate processes that are still alive.
        # Try SIGTERM then SIGKILL if the process isn't going down.
        # The reason is related to python signal handling is limited
        # to main thread and if that is in c/c++ land and stuck it won't
        # to handle it. We have seen processes getting stuck not handling
        # SIGTERM for the above reason.
        timeout: int = 30
        for process in self.processes:
            if process.is_alive():
                log.warning("Terminating process %s via signal SIGTERM", process.pid)
                process.terminate()
        end = time.monotonic() + timeout
        for process in self.processes:
            time_to_wait = max(0, end - time.monotonic())
            process.join(time_to_wait)
        for process in self.processes:
            if process.is_alive():
                log.warning(
                    "Unable to shutdown process %s via SIGTERM , forcefully exiting via SIGKILL",
                    process.pid,
                )
                process.kill()
            process.join()

        # The file will only be created if the process crashed.
        failed_process = self.processes[error_index]
        if not os.access(self.error_files[error_index], os.R_OK):
            exitcode = self.processes[error_index].exitcode
            if exitcode < 0:
                try:
                    name = signal.Signals(-exitcode).name
                except ValueError:
                    name = f""
                raise ProcessExitedException(
                    "process %d terminated with signal %s" % (error_index, name),
                    error_index=error_index,
                    error_pid=failed_process.pid,
                    exit_code=exitcode,
                    signal_name=name,
                )
            else:
                raise ProcessExitedException(
                    "process %d terminated with exit code %d" % (error_index, exitcode),
                    error_index=error_index,
                    error_pid=failed_process.pid,
                    exit_code=exitcode,
                )

        with open(self.error_files[error_index], "rb") as fh:
            original_trace = pickle.load(fh)
        msg = "\n\n-- Process %d terminated with the following error:\n" % error_index
        msg += original_trace
>       raise ProcessRaisedException(msg, error_index, failed_process.pid)
E       torch.multiprocessing.spawn.ProcessRaisedException: 
E       
E       -- Process 0 terminated with the following error:
E       Traceback (most recent call last):
E         File "/usr/local/lib/python3.10/dist-packages/morpheus/models/dfencoder/multiprocessing.py", line 34, in _wrap
E           fn(i, *args)
E         File "/root/morpheus/tests/dfencoder/test_dfencoder_distributed_e2e.py", line 187, in _run_test
E           assert min(losses) < LOSS_TARGETS[loss_type][feature] * LOSS_TOLERANCE_RATIO
E       AssertionError

/usr/local/lib/python3.10/dist-packages/torch/multiprocessing/spawn.py:188: ProcessRaisedException
========================================= short test summary info ==========================================
FAILED tests/dfencoder/test_dfencoder_distributed_e2e.py::test_dfencoder_distributed_e2e - torch.multiprocessing.spawn.ProcessRaisedException: 
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! stopping after 1 failures !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
============================================ 1 failed in 19.51s ============================================

Full env printout

Click here to see environment details

 [Paste the results of print_env.sh here, it will be hidden by default]

Other/Misc.

No response

Code of Conduct