Open kuizhiqing opened 1 year ago
@kuizhiqing Thanks for creating great issue ! Can you create a proposal like https://github.com/kubeflow/katib/tree/master/docs/proposals?
I think this great proposal is worth leaving as documentation.
cc: @kubeflow/wg-training-leads
cc @zw0610
@tenzen-y Thank you for your kind feedback. I appreciate your suggestion to provide a documentation version which could be a PR though. However, I think it would be better to do so after further discussion and we have reached some agreement on this issue. As you noted, I have proposed some questions to prompt discussion, but I am still refining these ideas to develop a more robust proposal, especially for the elastic training part.
@tenzen-y Thank you for your kind feedback. I appreciate your suggestion to provide a documentation version which could be a PR though. However, I think it would be better to do so after further discussion and we have reached some agreement on this issue. As you noted, I have proposed some questions to prompt discussion, but I am still refining these ideas to develop a more robust proposal, especially for the elastic training part.
I see. SGTM. I will leave my feedback later on this issue.
Thanks for taking time for writing this detailed proposal. I will review it carefully soon. I have few early questions
What are your thoughts on implementing nproc-per-node
in K8s. In the operator, nodes are currently mapped to a replica pod. Same applies to the elastic training as well.(Ref)
What are your expectations in mapping the the distributed args in the operator context?
Will your new definition of WORLD_SIZE and RANK(with 3D parallelism) hold true also in the default Data parallelism case as well? How does the operator differenciate between both schemes?
Are we expecting users to use torchrun
in the container command always?
@johnugeorge Thank you for your comment. Let me try to answer your questions:
In the GPU case, nproc_per_node
should equal the number of GPUs on each node, which equals the number of GPU resources declared. This can be implemented using an environment variable. In the CPU case, the operator may not assign a value to it, allowing the user to do so using arguments or a user-defined environment variable.
For those DISTRIBUTED_ARGS:
--nproc_per_node
was mentioned above --nnodes
should be the number of replicas, which is the number of pods --node_rank
is not obligatory for torchrun
. That is the first question I proposed in the list. --master_addr
will be assigned the address of one pod, which could be rank 0 or another pod. --master_port
can be any available port, which can be achieved by random selection within a range. Again, arguments can be set equally using environment variables.
This will always follow the same scheme for the operator. I explained 3D parallelism for two main reasons:
That is the second question in my list. In my personal opinion, it is OK to just support torchrun
since it is a generic feature of PyTorch.
Thank you for driving this @kuizhiqing
In my personal opinion, it is OK to just support torchrun since it is a generic feature of PyTorch.
Does it mean that users can't use PyTorchJob without torchrun
?
How can we deal with users that still use Kubeflow PyTorchJob with PyTorch < 1.9.0 version ?
Is only supporting torchrun and elastic mode acceptable, to simplify the operator? I think Yes, focusing the operator on elastic distributed training with torchrun encourages a simple, robust design aligned with PyTorch's capabilities. Warnings could notify users if torchrun is not used.
I think that this is breaking change. So I would propose that we create a new v2beta1 PyTrochJob.
Rank assignment remains open for discussion:
Allow PyTorch to assign node ranks:
- In dynamic mode, ranks are assigned by alphabetical sorted IP, partially optimizing locality
- In elastic etcd mode, ranks are assigned randomly by join order Have the operator assign pod ranks by numerically sorted IP, assuming IP proximity correlates with locality
At first, we can implement simple PyTorchJob v2 assuming torchrun
. Then, we can evolve API and controller to support elastic and dynamic semantics.
@kuizhiqing WDYT?
@tenzen-y I am not sure, that dropping support for older PyTorch versions in PyTorchJobs makes sense for our users.
Is it possible to support 2 distributed techniques ? Using torchrun
and using old method with RANK
and WORLD_SIZE
.
@andreyvelich @tenzen-y I think we have three ways to define the entrypoint for training,
python train.py
- this is our current approachtorch.distributed.launch train.py
- this approach requires specifying WORLD_SIZE and LOCAL_RANK enviroment variables.torch.distributed.run train.py
- this approach does not require specifying WORLD_SIZE and LOCAL_RANK.Overall, I want to unify all training to use torchrun, including regular training and elastic training, though unifying the approaches has been more difficult than anticipated.
For now, if we talking about a short-term and practical solution, since approach 3 is compatible with approach 2, I prefer to make minor changes to support approaches 2 and 3, without breaking change. Approach 1 is unnecessary to support since its functionality can be achieved with approach 2 by setting nproc=1.
Is it possible to support 2 distributed techniques ? Using torchrun and using old method with RANK and WORLD_SIZE.
I agree with you. I think we can take the above way when it isn't possible.
For now, if we talking about a short-term and practical solution, since approach 3 is compatible with approach 2, I prefer to make minor changes to support approaches 2 and 3, without breaking change. Approach 1 is unnecessary to support since its functionality can be achieved with approach 2 by setting nproc=1.
Overall it sounds good to me. But I have one more concern and question.
How to know that users can not use approach 1 and that the training operator has stopped supporting approach 1?
Document? Release Note? I think adding validation to check whether podSpec.Containers.command has torchrun
or torch.distributed.run
might be good. But if users define the command as Dockerfile ENTRYPOINT, this validation doesn't work well.
@kuizhiqing @andreyvelich @johnugeorge Do you have any ideas?
@tenzen-y @kuizhiqing Can we just verify the container start command and assign the appropriate env variables?
python train.py
, controller sets WORLD_SIZE
and RANK
.torchrun
or torch.distributed.launch train.py
, controller set WORLD_SIZE
and LOCAL_SIZE
@tenzen-y @kuizhiqing Can we just verify the container start command and assign the appropriate env variables?
- if start command is
python train.py
, controller setsWORLD_SIZE
andRANK
.- if start command is
torchrun
ortorch.distributed.launch train.py
, controller setWORLD_SIZE
andLOCAL_SIZE
If users define commands as Dockerfile ENTRYPOINT, that way doesn't work fine. But I don't have a good idea which env vars we should set when podSpec.containers[0].command is empty.
@tenzen-y I remember, that we can extract Docker Entrypoint using go-containerregistry
library.
Like in Katib: https://github.com/kubeflow/katib/blob/master/pkg/webhook/v1beta1/pod/utils.go#L103-L111C25.
Not sure if that is still working, but we can try to do the same for Training Operator.
@tenzen-y I remember, that we can extract Docker Entrypoint using
go-containerregistry
library. Like in Katib: https://github.com/kubeflow/katib/blob/master/pkg/webhook/v1beta1/pod/utils.go#L103-L111C25. Not sure if that is still working, but we can try to do the same for Training Operator.
Thanks for the great suggestion. Maybe we should evaluate the library: "Which standards are they supported? (Docker v1, Dokcer v2, OCI v1, OCI v2, Lazy Pulling, etc...)"
If I understand correctly, the only change proposed is to change the semantics how WORLD_SIZE, RANK, LOCAL_RANK, LOCAL_SIZE variables are populated.
torchrun
by default breaks the current deployments/examples. What is the way to provide the support for current scheme? Or at a minimum, provide backwards compatibility to previous deployments?torchrun
case when nproc_per_node
is set to number of GPUs per node(>1), torchrun starts multiple process where each process has LOCAL_RANK ranging from 0 to nproc_per_node. In your proposal, how is this handled? multiple containers in the same pod?If I understand correctly, the only change proposed is to change the semantics how WORLD_SIZE, RANK, LOCAL_RANK, LOCAL_SIZE variables are populated.
- Having
torchrun
by default breaks the current deployments/examples. What is the way to provide the support for current scheme? Or at a minimum, provide backwards compatibility to previous deployments?- In the default
torchrun
case whennproc_per_node
is set to number of GPUs per node(>1), torchrun starts multiple process where each process has LOCAL_RANK ranging from 0 to nproc_per_node. In your proposal, how is this handled? multiple containers in the same pod?- In the current code, we do not extrct or take any decision based on the Worker PodSpec. In your proposal, you meant to extract example.com/gpu info from the PodSpec to set nproc_per_node(number of GPUs per node)?
@johnugeorge
torchrun
, and it will run multiprocess in the same container, this is efficient and transparent to operator.
- In the current code, we do not extrct or take any decision based on the Worker PodSpec. In your proposal, you meant to extract example.com/gpu info from the PodSpec to set nproc_per_node(number of GPUs per node)?
- Yes, the controller set env based on the PodSpec is OK for me. Do you have any concern about it ?
I think we should add fields for setting nproc_per_node
to the top level instead of taking example.com/gpu from podSpec.
Then I have 2 concerns about taking example.com/gpu
and setting the value asnproc_per_node
:
nproc_per_node
? AFAIK, Operator can not know in advance which resources specified in podSpec.containers.resource
are accelerators since we can set any resources to podSpec.containers.resource
provided by device-plugins (OSS one and in-house one, etc...).@tenzen-y Yes, you are right, many cloud provider custom their resources declaration. Maybe adding field explicit is better.
5. In the current code, we do not extrct or take any decision based on the Worker PodSpec. In your proposal, you meant to extract example.com/gpu info from the PodSpec to set nproc_per_node(number of GPUs per node)?
- Yes, the controller set env based on the PodSpec is OK for me. Do you have any concern about it ?
I think we should add fields for setting
nproc_per_node
to the top level instead of taking example.com/gpu from podSpec. Then I have 2 concerns about takingexample.com/gpu
and setting the value asnproc_per_node
:
- Which devices are counted as
nproc_per_node
? AFAIK, Operator can not know in advance which resources specified inpodSpec.containers.resource
are accelerators since we can set any resources topodSpec.containers.resource
provided by device-plugins (OSS one and in-house one, etc...).- That behaviors are Implicit and aren't declarative. In the K8s world, we should strive to make the parameters declaratively configurable.
It would be better if such a new field for proc_per_node
is compatible with the contemporary PyTorchJob API instead of v2.
@tenzen-y Yes, you are right, many cloud provider custom their resources declaration. Maybe adding field explicit is better.
@kuizhiqing Could you update the proposal to include new fields for the nproc_per_node
? Thanks for your effort.
It would be better if such a new field for proc_per_node is compatible with the contemporary PyTorchJob API instead of v2.
I'm ok with adding new fields to PyTorchJob v1 to maintain backward compatibility.
Yes, a new field will help in maintaining backward compatibilty in supporting all launchers
nprocs_per_node by default is set to 1. If this field is set with a value greater than 1, use new way of setting WORLD_SIZE and other parameters. Else, we will use current way. Master spec can be explicit master or one of the pods(eg: pod0)
@tenzen-y Yes, you are right, many cloud provider custom their resources declaration. Maybe adding field explicit is better.
@kuizhiqing Could you update the proposal to include new fields for the
nproc_per_node
? Thanks for your effort.
I'm working on a version try to make a compatible change as we discussed, this PR is not fully tested in all cases.
So we prefer to place the 8 TP partitions on the same node, the 8 DP partitions on the same DGX box"
@kuizhiqing Curious which DGX box are you using? Is it a 32 DGX cluster with 8 GPUs each? You referred to having TP and DP on the same DGX?
Regarding the rank assignment discussion, what is ideal topology and expectation of rank of workers given your DGX cluster setup ?
So we prefer to place the 8 TP partitions on the same node, the 8 DP partitions on the same DGX box"
@kuizhiqing Curious which DGX box are you using? Is it a 32 DGX cluster with 8 GPUs each? You referred to having TP and DP on the same DGX?
Regarding the rank assignment discussion, what is ideal topology and expectation of rank of workers given your DGX cluster setup ?
@kuizhiqing Does DGX box mean DGX superPOD?
@johnugeorge @tenzen-y Sorry I've made a mistake here, the DGX box is the same as node here, I mean Switch for DGX. While the principle holds: TP group closer than DP group, closer than PP group.
@johnugeorge @tenzen-y BTW, the PR https://github.com/kubeflow/training-operator/pull/1840 is ready to review.
It implement part of what I propose to do, I'm continuing working on it.
@kuizhiqing After #1840 , do you need more changes wrt this issue? How do you handle rank assignment in your current deployment?
@johnugeorge wrt this issue, I want to make a change that make the master/worker declaration separation optional, as someone already request this feature.
Otherwise, I will leave other idea in long term improvement.
do you plan to take this up in a week? Release feature freeze will happen by end of next week.
@johnugeorge After evaluation, I think the feature making master declaration optional may not necessary in current design, and it can be done in the current elastic mode. Overall, I have no more change pending before release. Thx.
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
/lifecycle frozen
Motivation
The current PyTorch Operator focuses on a one process per pod architecture, which may not fully utilize the generic design of PyTorch and can underperform. When users adopt
torchrun
as the entrypoint, the operator does not function properly, see https://github.com/kubeflow/training-operator/issues/1790.Background
PyTorch Architecture
For distributed training with GPUs, frameworks like PyTorch and PaddlePaddle use a one process per GPU architecture. They introduce a launch process on each node to manage the GPU-bound processes.
Each process is identified by:
world_size
: The total number of GPUs.rank
: The global rank of the process (from 0 toworld_size - 1
).local_size
: The number of GPUs on the node.local_rank
: The rank of the process within the node.Thanks to argparse_util, these settings can also be passed through environment variables which have higher priority. For this proposal, we do not distinguish between args and env vars.
Since version 1.9.0 (released on Jun 2021), PyTorch has
torchrun
which is an alias forpython -m torch.distributed.run
. Compared totorch.distributed.launch
,torchrun
provides:rank
andsize
are assigned automatically.Note that when using
torchrun
, specifying rank is optional; rank can be provided with the--node_rank
argument if desired, buttorchrun
will automatically assign ranks otherwise.Megatron and Large Language Model Training
3D parallelism is commonly used to train large models in a distributed fashion. For example, a job could use:
This requires 8x8x4 = 256 GPUs across 32 nodes of 8 GPUs each. The model has 8 (TP) x 4 (PP) = 32 partitions, with 8 replicas of the model taking different input data.
Communication overhead is typically TP > DP > PP. So we prefer to place the 8 TP partitions on the same node (DGX box), the 8 DP partitions on the same Switch, and PP partitions as close as possible.
The parallel groups (TP, DP, PP) are formed based on the rank of each worker, so the rank of each worker (bound to a GPU) indicates its proximity.
The scheduler assigns resources to optimize proximity. The operator or something else should assign ranks accordingly. One more thing to note, in performance critical scenarios, users will typically run pods with host network for maximum efficiency.
Current Design
The current operator design appears tailored for PyTorch versions before 1.9.0 and favors running without the
torchrun
module; specifically, it calculatesWORLD_SIZE
based on pod replica count, inconsistent withtorchrun
's methodology.Proposed Design
The goal of the operator should be to natively support running Megatron-LM examples, such as the one shown here, using a YAML spec like the following:
The operator will handle setting distributed training environment variables for the example. It will:
WORLD_SIZE=256
andLOCAL_SIZE=8
Rank assignment remains open for discussion:
- In dynamic mode, ranks are assigned by alphabetical sorted IP, partially optimizing locality
- In elastic etcd mode, ranks are assigned randomly by join order
For maximum performance, users often implement custom rank assignments before calling
torchrun
or by modifying PyTorch's internal rank assignment logic.Discussion List
Should pod rank assignment be in the scope of the operator, or handled externally?
There are arguments for either approach. The operator assigning ranks enables optimization but reduces flexibility. External rank assignment is more flexible but may lack optimization.
Is only supporting
torchrun
and elastic mode acceptable, to simplify the operator?I think Yes, focusing the operator on elastic distributed training with
torchrun
encourages a simple, robust design aligned with PyTorch's capabilities. Warnings could notify users iftorchrun
is not used.Is designating one pod as a master separately necessary for collective training?
The current design is somewhat confusing and technically unnecessary.
Should pod rank be omitted from pod names in elastic mode?
Omitting rank from pod names in elastic mode decouples pod identity from rank, allowing ranks to change dynamically as nodes join or leave the cluster. This flexibility is important for elasticity.
This draft is not yet mature and there are many aspects that require further consideration. Comments and discussion are welcome.
Reference