ray-project / ray

Ray is a unified framework for scaling AI and Python applications. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.06k stars 5.6k forks source link

Ray Data Dataset.map_batches does not properly spread `neuron_cores` resource requests #47631

Closed HarryCaveMan closed 33 minutes ago

HarryCaveMan commented 5 days ago

What happened + What you expected to happen

Have a ray (2.34.0, python3.10) cluster running two inf2.xlarge nodes. Each node has 1 neuron device and 2 neuron_cores. When I run the following:

batches = docs.map_batches(
    bulk_inference_model,
    batch_size=128,
    concurrency=4,
    num_cpus=1,
    resources={
        "neuron_cores": 1
    },
    scheduling_strategy="SPREAD"
)

I expect 4 concurrent workers each occupying one neuron core. Instead, ray packs all 4 workers onto one node which causes both OOM errors and errors initializing neuron runtime (due to lack of available cores on the device). Unfortunately, STRICT_SPREAD is not a valid scheduling_strategy under ray_remote_options. Additionally, I am unable to use a PlacementGroupSchedulingStrategy with STRICT_SPREAD because bundles only support CPU, GPU, and Memory resources and not accelerators, all the way down to the c++ core here.

Versions / Dependencies

ray (2.34.0, python3.10) ray[data]==2.34.0

Reproduction script

Run the following transform on a ray cluster containing 2 inf2.xlarge, model compiled with batch size 128:

batches = docs.map_batches(
    bulk_inference_model,
    batch_size=128,
    concurrency=4,
    num_cpus=1,
    resources={
        "neuron_cores": 1
    },
    scheduling_strategy="SPREAD"
)

Issue Severity

High

HarryCaveMan commented 5 days ago

I am going to attempt to pack two ray nodes (kuberay worker pods) onto each ec2 instance to see if this will help. I suspect that it is packing because the pods have 4 CPU each and the scheduler is not accounting for the neuron resource request. Will update.

HarryCaveMan commented 18 hours ago

Due to limitations in karpenter, I was unable to run the test mentioned above, however, I was able to resolve this by setting the runtime_env for each worker, and changing my actor class for the map_batches call to use fn_constructor_args rather than trying to maintain one instance of the model internally.

Old batch model example:

class Model:
    def __init__(self,path):
        self.model = AutoModel.from_pretrained(path)
        self.tokenizer = AutoTokenizer.from_pretrained(path)

    def predict(self,texts):
        inputs = tokenizer(texts)
        return model(**inputs)

class BulkModel:
     def __init__(self,path):
        self.path = path
        self.model = None

    def __call__(self,batch):
        if not self.model:
            self.model = Model(self.path)
        return self.model.predict(batch)

bulk_inference_model = BulkModel('mymodelpath')

batches = docs.map_batches(
    bulk_inference_model,
    batch_size=128,
    concurrency=4,
    num_cpus=1,
    resources={
        "neuron_cores": 1
    },
    scheduling_strategy="SPREAD"
)

New batch model example:

class Model:
    def __init__(self,path):
        self.model = AutoModel.from_pretrained(path)
        self.tokenizer = AutoTokenizer.from_pretrained(path)

    def predict(self,texts):
        inputs = tokenizer(texts)
        return model(**inputs)

class BulkModel:
     def __init__(self,path):
        self.path = path
        self.model = Model(path)

    def __call__(self,batch):
        return self.model.predict(batch)

batches = docs.map_batches(
    BulkModel,
    fn_constructor_args=('mymodelpath')
    batch_size=128,
    concurrency=4,
    num_cpus=1,
    resources={
        "neuron_cores": 1
    },
   runtime_env={
        "NEURON_RT_NUM_CORES":"1"
    },
    scheduling_strategy="SPREAD"
)
HarryCaveMan commented 18 hours ago

I still think that I probably should not be setting NEURON_RT_NUM_CORES manually because it should always reflect the resource request for neuron_cores, but this did prevent ray data from trying to allocate more cores than were available.

bveeramani commented 33 minutes ago

Closing because I think you found a workaround for this issue. Feel free to re-open if the issue comes up again