Closed Rosie-m closed 2 months ago
Thank you for the great write up! This is going to be really cool, and will push Zeus to another level of open source software. I'm super excited about it!!
Some random comments and questions:
Running
recurrences). Make sure your code/architecture doesn't assume that it doesn't.PyTorchJob
s inside the cluster and users outside the cluster should both be able to reach the Zeus server endpoint. The former can just be done with the default ClusterIP
server type. Regarding the latter, since we're using a custom deployment of K8s (not a managed deployment like EKS), we can just start with a NodePort service.GET /jobs/{job_id}
and GET /trials
in terms of query vs path parameter.rec_i
or trial_i
in the API are not readable/comprehensible./trials/report_profiling
sounds awkward since "profiling" is a present participle, not a noun. Either /trials/report
or /trials/report_profiling_result
look better to me.ZeusDataLoader
that it's inside a KubeFlow cluster and it should make a POST request to the ZeusServer at the end of training.Thanks for the great comments! Based on the comments and our discussion today, I listed all the changes I plan to make. Could you please take another look? Thx!!
We will use Tortoise ORM + PostgreSQL.
rec_i
=> recurrence_number
trial_i
=> trial_number
ZeusServer
Job
We will change our design from executing each Job
sequentially to executing Recurrence
s within one Job
concurrently. This will allow the Recurrence
s within one Job
to overlap with each other. It also means the period of time between two recurrences will only depend on the trigger condition (when either elapsed time is satisfied or a drift is detected). Each Recurrence
is still executed sequentially.
asyncio.Task
for each Recurrence
without waiting for its completion.Recurrence
s are created, the job will wait for all of them to finish, and then exit.POST /trials/report_profiling_result
Endpoint for PytorchJob
s to report profiling results.
GET /jobs
Endpoint for users to query Jobs
.
GET /jobs
will list all the jobs submitted by users. We will further enable users to filter the jobs based on query parameters, for example, GET /jobs?phase=completed
will list all the completed jobs.
Also, job_id
will also be defined as a query parameter instead of a path parameter. For example, we will use GET /jobs/?job_id=000001
instead of GET /jobs/000001
to query the information of the job with id 000001
.
In K8S, a Service
defines a set of pods (a group of one or more containers with shared resources and specifications) and how to access them. In our design, ZeusServer
and PytorchJob
s are pods on the same cluster. The communication between them can be achieved by querying Cluster-IP
. However, for users outside the cluster to send GET
or POST
requests, we will need to define a NodePort
service.
We will write a client library that sends request to ZeusServer
API. We will then create a CLI wrapper around the client library. This add-ons will ease future uasage of this integration.
Currently, the training script (train.py
) contains both data preprocessing and training. However, we might want to decouple these two phases. One example is that, for the experiment purpose, we might want to run training multiple times on the same data to try out different batch sizes. In this case, preprocessing data multiple times will be an unnecessary overhead.
In our design, the ideal case will be that we run data preprocessing once before each Recurrence
, and do NOT repeat it in each Trial
.
Down this line, we plan to support user-defined frequencies for each stage in the ML pipeline. A snapshot of our plan will be:
Argo Workflows
to support modeling workflows as DAGs with backedgesThanks! Looks great.
Comment I want to add for being recurrence overlap aware:
I am thinking about two solutions to the problems you mentioned.
We constrain the number of running recurrences for each job. The condition to launch a new recurrence will be: 1) elapsed time is satisfied or drift is detected, and 2) there is a free slot for this new recurrence. This will constrain the memory required while providing some sense of concurrency.
Setting a lower bound for the recurrence period is hard. One potential solution is to set it as the time for one recurrence. Then, it is equivalent to no overlapping, i.e. only one recurrence is running and the whole job will be executed sequentially. This also makes sense to me, since the overlapping does waste some BSO feedback. In this case, the time between two recurrences will be
$$\max{[ T(OneRecurrence), \min{[T(RecurrencePeriod), T(DriftDetected)]} ]}$$
I like 1 better. This can also act like a resource usage cap for users to ensure fairness when multiple users use the same cluster.
Just dumping a quick thought about MetricManager
.
The current implementation of ZeusDataLoader
keeps profiled metrics as class variables so that it can be shared between the train and eval dataloaders, but arguably this design is not the easiest to understand. Instead, maybe the FileBackedMetricManager
class can keep JSON file(s) on the local filesystem. All read and writes of throughput, power, etc go to the file.
It'll be possible to create a write-back cache inside the metric manager and share the metric manager between the train and eval dataloader by storing it was a class variable of ZeusDataLoader
. Or, you may just not do this and have all reads/writes access the JSON file.
I will create a MetricManager
that stores and manages power
and train
metrics. The MetricManager
will be fully file-backed, reading and writing to the local filesystem. If we are in the Kubeflow mode, besides writing to the local filesystem, MetricManager
will POST the metrics to ZeusServer
, who is responsible to store them in DB.
This design will abstract away the metrics store and reporting logic from the current ZeusDataLoader
.
What's the status of this? @Rosie-m @jaywonchung
@Rosie-m will clean and push existing code in a branch and document progress.
I have updated the issue content based on the design changes we discussed and have a summary of the current progress. The existing code can be found in kubeflow
branch. @jaywonchung @mosharaf
Everything looks good! Thank you for your work @Rosie-m 👍
Motivation
Kubeflow is an open-source Kubernetes-native platform for developing, orchestrating, deploying, and running scalable and portable ML workloads. This integration will enable developers in the industry to directly deploy Zeus onto their Kubeflow cluster, or serve as an example of integrating Zeus into their internal MLOps platforms. By facilitating the adoption of Zeus into the industry, we hope to encourage tech companies to try out Zeus and make their ML systems energy-efficient.
Brief Background
In Zeus, a
job
will sequentially recur for a given number of times. In eachrecurrence
, it will launch the training script until it converges (i.e. reach a user-defined target metric) or reaches the upper limit of retries.Before each trial, we will use the batch size optimizer (BSO), which runs Multi-Armed Bandits, to predict the next batch size to use for this try. After each try, we will feedback on the result of this trial, including time, energy, cost, and whether this try converged to
BSO
to help future predictions. The one-time launch of the training script and the pre- and post-interaction withBSO
is referred to as atrial
.Main Challenges
Zeus
relies on the local filesystem to store the training and profiling results. But as we are using Kubernetes to automatically launch the training script and the Kubernetes control plane automatically handles scheduling the pods across the nodes in the cluster. We can not assume everything happens on the same node anymore. Instead, we need to take another approach to keep the training and profiling results across runs.Proposed Design
We envision building an end-to-end system that allows users to use Zeus transparently. Zeus has two key components: a just-in-time (JIT) online profiler, and batch size optimizer with multi-armed bandit. We will have a server that asynchronously serves BSO for multiple jobs, and we will have an extended JIT online profiler running on the client side that report profiling and training results back to the server.
Overview
Zeus + Kubeflow
will contain the following components:Server-side:
ZeusServer
: A server that accepts user-submitted jobs, serves BSO, and handles user queries.Database (DB)
: Stores all the log data, including submittedjob
s, launchedtrial
s, and profiling data (power, time, and so on). This is the single source of truth in our design.Client-side:
ZeusDataLoader
: we will extend Zeus online profiling by incorporating aMetricManager
to report results to the server.An End-to-End View
The above figure shows an end-to-end view. We now explain how the components work together.
Job Creation and Trial Launch
Job
Trial
of thisJob
ZeusDataLoader.__init__
create a new UUID as thejob_id
for thisJob
.ZeusDataLoader.__init__
will send a POST request toZeusServer
with itsjob_id
for the batch size to use for thisTrial
.ZeusServer
validate the POST request to create a new job and insert the newJob
toDB.Jobs
.ZeusServer
insert theTrial
toDB.Trials
.ZeusServer
creates anasyncio.Task
to server BSO for theJob
.ZeusServer
replies with the batch size to the user.ZeusDataLoader.__init__
will then use the batch size to initializeDataLoader
and then start training.Trial
s of the sameJob
:ZeusDataLoader.__init__
will send a POST request toZeusServer
with the existingjob_id
for the batch size to use for thisTrial
.ZeusServer
validates the POST request for a newTrial
with existingJob
.ZeusServer
insert theTrial
toDB.Trials
.ZeusServer
routes the request to the existingasyncio.Task
of thisJob
.ZeusServer
replies with the batch size to the user.ZeusDataLoader.__init__
will then use the batch size to initializeDataLoader
and then start training.NOTE: Each
Job
has oneasyncio.Task
that serves its BSO instance.Training and Profiling
Client-side:
A
Trial
runs./train.py
onceAt the end of profiling,
Trial
needs to report theProfilingResult
toZeusServer
ZeusServer
will then insert intoDB.Profiling
.At exit,
Trial
needs to report(energy, time, cost, num_epochs, reached)
toZeusServer
ZeusServer
will then update the corresponding record of thisTrial
inDB.Trials
.How
ZeusDataLoader
report profiling/trianing results to the server?ZeusDataLoader
does aPOST
request to theZeusServer
withTrialResult
(orProfilingResult
).Server-side:
TrialResult
(orProfilingResult
) to the globalZeusServer
class instance.ZeusServer
instance routesTrialResult
(orProfilingResult
) toasyncio.Task
for that job through anasyncio.Queue
channel.Task
wasawait _job_trial_result_channel.get()
' ing, and getTrialResult
(orawait _job_profiling_result_channel.get()
' ing, and getProfilingResult
.TrialResult
inDB.Trials
(orProfilingResult
inDB.Profiling
).Detailed Look into Each Component
Here, we will explain the details of each component and what they provide.
ZeusServer
ZeusServer
contains the following sub-components:DBAPI
: an INSERT/UPDATE interface that interacts with DB for storing and querying information across jobs and trials.BatchSizeOptimizer
(BSO): A component from Zeus that predicts the optimal batch size. BSO learns from the feedback (results of each trial) and adjusts its internal states. BSO achieves this by implementing Multi-Armed Bandits (MAB) with Thompson sampling.Database
The
Database
store states across jobs and trialsIt contains three tables:
Jobs
,Trials
, andProfiling
.Jobs
ZeusServer
after a user submits it toZeusServer
.phase
of a job is changed fromRunning
toCompleted
after thenum_recurrences
has done.Trials
Records all trials created
ZeusServer
after its completion.This table contains the same information as
train_json
in original Zeus.Profiling
For each job, records the following mappings:
power_limit
)power_limit
->train_avg_power
power_limit
->train_tput
opt_power_limit
)opt_power_limit
->eval_avg_power
opt_power_limit
->eval_tput
Append-only
Trial
, inserted at the first end of epoch when profiling is done.This table contains the same information as
power_json
in the original Zeus.(Example job CIFAR100 with ShuffleNet)
Extended
ZeusDataLoader
We will extend
ZeusDataLoader
so that:Register the job and query the batch size at initialization.
job_id
. This is also the unique identifier inDB.Jobs
table on the server side.ZeusServer
. BSO served on the server side will then generate the next batch size to use and reply to the client.Report Results.
TrialResult
. At the exit,ZeusDataLoader
will report theTrialResult
, including time, energy, cost, number of epochs completed and whether the target metric is reached, toZeusServer
.ProfilingResult
. After profiling is done,ZeusDataLoader
will report theProfilingResult
toZeusServer
.MetricManager
toZeusDataLoader
for the above two reporting purposes.MetricManager
will decide whether to store the results in the local filesystem (the practice of original Zeus) or sendPOST
requests toZeusServer
(required in this integration).Communication
In Kubernetes, a Service defines a set of pods (a group of one or more containers with shared resources and specifications) and how to access them. In our design, ZeusServer and PytorchJobs are pods on the same cluster. The communication between them can be achieved by querying Cluster-IP. However, for users outside the cluster to send GET or POST requests, we will need to define a NodePort service.
Failure Handling
reached == False
Testing with Kubeflow/
PytorchJob
For Kubeflow users, it is very convenient to create
PytorchJob
s to launch their training scripts (i.e.Trial
). We need to do the same while testing our design. Following are the things we need to take care of when creatingPytorchJob
s:PytorchJob
s with Kubernetes API, which allows us to configure the command to run as well as environment variables. Same as Zeus, the job-specific parameters are specified in the command and the Zeus-specific parameters are specified as environment variables.SYS_ADMIN
.Here is an example in the
.yaml
format.Future Directions
Selective repetitiveness with different frequencies
Currently, the training script (
train.py
) contains both data preprocessing and training. However, we might want to decouple these two phases. One example is that, for the experiment purpose, we might want to run training multiple times on the same data to try out different batch sizes. In this case, preprocessing data multiple times will be an unnecessary overhead.In our design, the ideal case will be that we run data preprocessing once before each
Recurrence
, and do NOT repeat it in eachTrial
.Down this line, we plan to support user-defined frequencies for each stage in the ML pipeline. A snapshot of our plan will be:
Argo Workflows
to support modeling workflows as DAGs with backedgesProgress
Following is the list of components and the latest progress:
ZeusServer
:ZeusServer
and clients.kube/server/main.py
.kube/server/models.py
.ZeusServer
class that manages the training jobskube/server/server.py
. This singleton class contains all the functions that do the "real" work on the server side.kube/db/schema.md
.ZeusServer
will use to store and query states (seekube/server/dbapis.py
). Basically, send DB query and return the result ofZeusServer
.ZeusDataLoader
job_id
from the user as a CLI argument. Note this is a job-specific parameter.ZeusDataLoader
.__init__
, register the job (i.e. POST toZeusServer
) and receive the batch size before initializing theDataLoader
.ProfilingResult
when profiling is done.TrialResult
when training is done.Environment Setup
SymbioticLab/Zeus
.Docker
as the container runtimeLearning Materials