Closed tianyin closed 4 months ago
There is a mocked thread-based implementation for ray. See commit https://github.com/xlab-uiuc/acto/pull/235/commits/4bea2fe2ac6a077f470a3beda336620a0832942b
Updated @tianyin @tylergu
This is a very detailed writeup. Thanks @Spedoske !
@tylergu -- could you take a read of it and merge the writing to some design docs in the repo and then close it?
@Spedoske Thanks for the fantastic writeup and all the efforts! From my understanding this would be a quite big change in the code base. The architecture looks correct overall, but there are some details which I want to discuss. Let's have a sync to discuss some details, such as the abstraction of the input generation and the remote RPC (I think we should have an interface to implement for running the tests, and the default is to run it locally, and Ray can be only of the implementations).
@tianyin We will iterate on this and close it once we agree on the design and migrate the design doc into the repo
Sounds great! Thanks for doing it @tylergu @Spedoske
Close as it is not actively pursued for now.
Code Design Description
Runner
Class Initialization
The
Runner
class is initialized with the following parameters:engine_class
: A class representing the Kubernetes engine to be used. It should be a subclass ofKubernetesEngine
.engine_version
: A string representing the version of the Kubernetes engine.num_nodes
: An integer indicating the number of nodes to configure in the Kubernetes cluster.preload_images
: A list of strings representing Docker image names to be preloaded into the cluster (optional).preload_images_store
: A callable function that takes an image hash as input and returns a string representing the file path where the preloaded images will be stored (optional).During initialization, the class sets up a few variables, including
preload_images
andpreload_images_store
, and starts a new thread to asynchronously set up the cluster and indicate its availability.Trial Execution
The
run
method is responsible for executing a trial and collecting snapshots. It takes the following parameters:trial
: An instance of theTrial
class representing the trial to be executed.snapshot_collector
: A callable function that takes three parameters: the currentRunner
instance, thetrial
, and a dictionary representing the system input. It returns a snapshot object.Within the
run
method, the class waits until the cluster is available (cluster_ok_event
) before executing the trial. For each system input in the trial, it attempts to collect a snapshot using thesnapshot_collector
function. If any exception occurs during snapshot collection, the exception is caught, and the error is stored along with the snapshot (if available). Thetrial.send_snapshot
method is called to send the snapshot and error to the trial.After processing all system inputs, the cluster availability event is cleared, and a new thread is started to asynchronously reset the cluster and set it as available again.
Cluster Setup and Teardown
The
Runner
class provides several private methods for setting up and tearing down the Kubernetes cluster.__setup_cluster_and_set_available
: This method is called during initialization to set up the cluster and indicate its availability. It calls the private__setup_cluster
method and sets thecluster_ok_event
to indicate that the cluster is ready for trial execution.__setup_cluster
: This method performs the actual setup of the Kubernetes cluster. It creates a unique cluster name, prepares the Kubernetes configuration file, prefetches Docker images (if specified), and configures the cluster using thekubernetes_engine_class
provided during initialization. It also creates an instance ofKubectlClient
for interacting with the cluster.__prefetch_image
: This method is responsible for prefetching Docker images into the cluster. It first acquires a file lock to ensure thread safety. Ifpreload_images
is provided, it checks if the preloaded images file already exists. If not, it pulls the specified Docker images locally usingdocker pull
and saves them as a tar archive usingdocker image save
.__teardown_cluster
: This method is responsible for tearing down the Kubernetes cluster. It deletes the cluster using thekubernetes_engine_class
, removes the Kubernetes configuration file, and performs any necessary cleanup.TrialInputIterator
The
TrialInputIterator
class is responsible for generating and iterating over system inputs (test cases) for a trial. It takes the following parameters during initialization:next_testcase
: An iterator that provides the next test case as a tuple of a list of strings representing the field path and aTestCase
object.root_schema
: AnObjectSchema
object representing the root schema for the system input.seed_input
: A dictionary representing the seed input for the trial.The class maintains a history of applied system inputs (
self.history
) and a queue of pending tests (self.queuing_tests
). It provides an__iter__
method that generates a tuple of(system_input, signature)
wheresystem_input
is a dictionary representing the next system input to be applied, andsignature
is a dictionary representing the signature of the associated test case.The
__iter__
method continues generating system inputs until there are no more tests in the queue (self.queuing_tests
) and no more test cases in thenext_testcase
iterator. For each system input, it applies the test case, appends the result to the history, and yields the system input. It handles mutation and setup of the input fields based on the test case's preconditions and mutators.The class also provides the
flush
method to flush the queuing tests and therevert
method to revert the last applied test case by preventing the tests in the queuing tests from being applied and re-applying the last valid test.Trial
The provided code represents a class called
Trial
, which is responsible for managing the execution of trials and checking the snapshots produced during the trial.Class Initialization
The
Trial
class is initialized with the following parameters:next_input
: An instance ofTrialInputIterator
, representing an iterator that provides the next system input for the trial.checker_set
: An instance ofCheckerSet
, representing a set of checkers used to verify the correctness of snapshots.num_mutation
: An optional integer indicating the maximum number of mutations to perform during the trial (default is 10).During initialization, the class sets up various variables, including
next_input
,checker_set
,snapshots
,run_results
,generation
,num_mutation
,error
,state
, andwaiting_for_snapshot
.Iterator Functionality
The
Trial
class implements the iterator protocol by defining the__iter__
method. This allows instances of the class to be used as an iterator. The__iter__
method returns a generator that yields system inputs from thenext_input
iterator until the maximum number of mutations (num_mutation
) is reached or an error occurs.Within the generator, the class checks the current state of the trial and retrieves the next system input from
next_input
. If the trial is in a terminated or runtime exception state, the generator terminates. If no more system inputs are available, the generator terminates as well.Before yielding the system input, the class sets the
waiting_for_snapshot
flag to indicate that a snapshot is expected. The yielded system input will be used to collect a snapshot in the trial execution.Snapshot Collection
The
send_snapshot
method is responsible for receiving a snapshot and a runtime error (if any) produced during the execution of a system input. It takes the following parameters:snapshot
: An optional instance ofSnapshot
representing the collected snapshot.runtime_error
: An optionalException
object representing a runtime error that occurred during the execution of the system input.The method first asserts that the trial is waiting for a snapshot and then sets the
waiting_for_snapshot
flag to False. If a runtime error is provided, the trial state is set to'runtime_exception'
, and the error is stored. An error message is also logged using thelogging.error
method.If a snapshot is provided, the method appends the snapshot to the
snapshots
list, increments the generation count, and sets thewaiting_for_snapshot
flag to False. Thecheck_snapshot
method is then called to perform the snapshot checking.Snapshot Checking
The
check_snapshot
method is responsible for performing the checking of snapshots using thechecker_set
. It verifies the correctness of the current snapshot based on the previous snapshot.The method first asserts that the trial state is not
'runtime_exception'
or'terminated'
. It then retrieves the previous snapshot and the current snapshot from thesnapshots
list. Using thechecker_set
, the method checks the snapshots and stores the result in therun_results
list.OracleControlFlow.ok
result, indicating that the snapshots are correct, the trial state is set to'normal'
.OracleControlFlow.terminate
result, indicating that the trial should be terminated, the trial state is set to'terminated'
.OracleControlFlow.revert
result, indicating that the input is invalid, the trial enters the'recovering'
state. It sets the previous snapshot to the snapshot that will be referenced as the latest valid snapshot, reverts the next input, and sets the trial state accordingly.acto.ray
acto.ray
provides a mock implementation of the Ray library when Ray is not enabled in the Acto configuration.The module has two parts: one for mocking the
ray.remote
andray.get
functionalities, and another for mocking theActorPool
class.Mocking
ray.remote
andray.get
The first part of the code checks if Ray is enabled in the Acto configuration (
actoConfig.ray.enabled
). If Ray is enabled, it imports the actual Ray module and assigns theray.remote
andray.get
functions to theremote
andget
variables, respectively.If Ray is not enabled, it defines a custom
remote
function and aget
function as replacements forray.remote
andray.get
. These functions provide a mock implementation that mimics the behavior of the actual Ray library.The
remote
function takes arunner
as an argument and modifies therunner
object's__init__
method. It adds an attributeremote
to therunner
object and sets it to therunner
itself. It also sets theremote
attribute of therun
method to therun
method itself. This allows therunner
object to be used as a remote function call.The
get
function simply returns the input as-is.Mocking
ActorPool
The second part of the code mocks the
ActorPool
class, which is used for managing a pool of actors.If Ray is enabled, it imports the actual
ActorPool
class from the Ray library.If Ray is not enabled, it defines a mock
ActorPool
class. This class provides a similar interface to the actualActorPool
class but uses aThreadPoolExecutor
instead of Ray's actor-based execution.The
ActorPool
class has the following attributes and methods:__init__(self, actors: list)
: Initializes the mock actor pool with a list of actors. It sets up internal variables, such as_idle_actors
,_pending_submits
,_result
,_pool
, and_result_count
.has_free(self) -> bool
: ReturnsTrue
if there are idle actors available in the pool, indicating that a task can be submitted.submit(self, fn, value)
: Submits a task to the actor pool. If there are idle actors available, it selects an idle actor, submits the task to theThreadPoolExecutor
, and assigns a callback function (__make_callback_fn
) to be called when the task is completed. If there are no idle actors, the task is added to the pending submits list.__make_callback_fn(self, runner)
: Generates a callback function that is called when a task submitted to theThreadPoolExecutor
is completed. It stores the result, releases a semaphore (_result_count
), adds the idle actor back to the pool, and checks for pending submits to submit the next task if available.get_next_unordered(self)
: Retrieves the next completed task result from the_result
list. It blocks if no results are available until a result becomes available (using the_result_count
semaphore).