skypilot-org / skypilot

SkyPilot: Run AI and batch jobs on any infra (Kubernetes or 12+ clouds). Get unified execution, cost savings, and high GPU availability via a simple interface.
https://skypilot.readthedocs.io
Apache License 2.0
6.71k stars 496 forks source link

[Serve] Potential Race Conditions and KeyError in `replica_managers.py` #4077

Open andylizf opened 2 weeks ago

andylizf commented 2 weeks ago

Description

There are multiple concurrency issues in sky/serve/replica_managers.py that can lead to inconsistent replica states and unexpected exceptions. The specific problems are outlined below with following code snippets as evidence.

https://github.com/skypilot-org/skypilot/blob/fdd68b209ee74f9282fac5c6834907d5fe72d255/sky/serve/replica_managers.py#L688-L695

https://github.com/skypilot-org/skypilot/blob/fdd68b209ee74f9282fac5c6834907d5fe72d255/sky/serve/replica_managers.py#L832-L873

1. Interrupted Status Overwritten

The _terminate_replica method sets the sky_launch_status to INTERRUPTED. However, if _refresh_process_pool runs concurrently, it may overwrite this status before the process is properly terminated, leading to incorrect status reporting.

2. KeyError When Deleting Process Handle Twice

After _terminate_replica deletes the replica ID from _launch_process_pool, _refresh_process_pool may attempt to delete the same replica ID again, resulting in a KeyError since the key no longer exists.

3. Exception When Terminating an Already Completed Process

The _terminate_replica method checks if launch_process.is_alive() before attempting to terminate it. However, between the is_alive() check and the terminate() call, the process might complete, causing terminate() to fail and potentially raising an exception.

Proposed Solution

  1. Implement Locking Mechanism:

    • Use thread locks (e.g., @with_lock decorator) to synchronize access to shared resources like _terminate_replica.
  2. Safe Deletion:

    • Use methods like pop with default values when deleting from dictionaries to avoid KeyError.
    • Example:
      self._launch_process_pool.pop(replica_id, None)
  3. Add Exception Handling:

    • Wrap the terminate and join calls in try-except blocks to handle cases where the process may have already completed.
    • Example:
      try:
       launch_process.terminate()
       launch_process.join()
      except Exception as e:
       logger.error(f'Error terminating launch process for replica {replica_id}: {e}')
andylizf commented 1 week ago

@cblmemo Could you check this issue when you get a chance? Thanks!

cblmemo commented 1 week ago

Thanks @andylizf ! Those are valuable observations. Could you help submitting a PR for this?

For solution 3, we might want not to use broad exception if possible. Can we only catch the error raised by .terminate() call?