exo-explore / exo

Run your own AI cluster at home with everyday devices 📱💻 🖥️⌚
GNU General Public License v3.0
15.48k stars 833 forks source link

How to use multiple GPU from a node #411

Open udupicloud opened 2 weeks ago

udupicloud commented 2 weeks ago
Screenshot 2024-11-04 at 1 27 25 PM

Hi,

Screenshot 2024-11-04 at 1 22 12 PM

I have a multi node setup with multiple GPU. I was able to get the cluster but I don't see the remaining GPU's from each nodes. How do I do that. Also observed below error while using llama 3.2 and Mistral large.

Screenshot 2024-11-04 at 1 21 35 PM

I am running the setup on Ubuntu 22.04 with python 3.12 and all the Nvidia drivers including the coda 12.4 has been installed. Installed llama and models 3.2. each ML machine has 2 (RTX 3070 and 3090) GPU's

jorge123255 commented 1 week ago

i had to configure the integration.py and the integration_engine.py to detect two gpus. Terflow detects them now but trying to figure out how to have it displayed when you run exo.

udupicloud commented 1 week ago

Hi @jorge123255 - Could you please help me with the steps to fix and what needs to be added or edited. I tried to find the files by the way, but couldn't find those files too.

Thank you, Mark.

jorge123255 commented 1 week ago

/exo/interence

inference_engine.py import torch # Assuming PyTorch or similar GPU access library is available import numpy as np import os from exo.helpers import DEBUG # Make sure to import DEBUG from typing import Tuple, Optional, List from abc import ABC, abstractmethod from .shard import Shard

class InferenceEngine(ABC): @abstractmethod async def infer_prompt(self, request_id: str, shard: Shard, prompt: str, image_str: Optional[str] = None, inference_state: Optional[str] = None) -> Tuple[np.ndarray, str, bool]: pass

@abstractmethod
async def infer_tensor(self, request_id: str, shard: Shard, input_data: np.ndarray, inference_state: Optional[str] = None) -> Tuple[np.ndarray, str, bool]:
    pass

def get_available_gpus() -> List[int]: """Returns a list of available GPU indices.""" if torch.cuda.is_available(): return list(range(torch.cuda.device_count())) else: print("Warning: No GPUs detected. Running on CPU.") return []

def get_inference_engine(inference_engine_name: str, shard_downloader: 'ShardDownloader'): if DEBUG >= 2: print(f"get_inference_engine called with: {inference_engine_name}")

available_gpus = get_available_gpus()
if DEBUG >= 1:
    print(f"Detected GPUs: {available_gpus}")

if inference_engine_name == "mlx":
    from exo.inference.mlx.sharded_inference_engine import MLXDynamicShardInferenceEngine
    return MLXDynamicShardInferenceEngine(shard_downloader, devices=available_gpus)

elif inference_engine_name == "tinygrad":
    from exo.inference.tinygrad.inference import TinygradDynamicShardInferenceEngine
    import tinygrad.helpers
    tinygrad.helpers.DEBUG.value = int(os.getenv("TINYGRAD_DEBUG", default="0"))

    return TinygradDynamicShardInferenceEngine(shard_downloader, devices=available_gpus)

elif inference_engine_name == "dummy":
    from exo.inference.dummy_inference_engine import DummyInferenceEngine
    return DummyInferenceEngine()

raise ValueError(f"Unsupported inference engine: {inference_engine_name}")

and under /exo/tinygrad inference.py

from pathlib import Path import json import os from exo.inference.tinygrad.models.llama import Transformer, convert_from_huggingface, fix_bf16 from exo.inference.shard import Shard from exo.inference.tokenizers import resolve_tokenizer from tinygrad.nn.state import load_state_dict from tinygrad import Tensor, nn, Context from exo.inference.inference_engine import InferenceEngine from typing import Optional, Tuple import numpy as np from exo.inference.tinygrad.tinygrad_helpers import concat_weights, load from exo.download.shard_download import ShardDownloader from concurrent.futures import ThreadPoolExecutor import asyncio

Tensor.no_grad = True

default settings

TEMPERATURE = float(os.getenv("TEMPERATURE", 0.85)) TOP_K = 25 TOP_P = 0.9 ALPHA_F = 0.1 ALPHA_P = 0.0 MODEL_PARAMS = { "8B": {"args": {"dim": 4096, "n_heads": 32, "n_kv_heads": 8, "n_layers": 32, "norm_eps": 1e-5, "rope_theta": 500000, "vocab_size": 128256, "hidden_dim": 14336}, "files": 1}, "70B": {"args": {"dim": 8192, "n_heads": 64, "n_kv_heads": 8, "n_layers": 80, "norm_eps": 1e-5, "rope_theta": 500000, "vocab_size": 128256, "hidden_dim": 28672}, "files": 8} }

def build_transformer(model_path: Path, shard: Shard, model_size="8B", devices=None):

build model

linear = nn.Linear
with Context(THREEFRY=0):
    model = Transformer(**MODEL_PARAMS[model_size]["args"], linear=linear, max_context=8192, jit=True, shard=shard)

# load weights and distribute across devices if multiple are available
if model_path.is_dir():
    if (model_path / "model.safetensors.index.json").exists():
        weights = load(str(model_path / "model.safetensors.index.json"), shard)
    elif (model_path / "model.safetensors").exists():
        weights = load(str(model_path / "model.safetensors"), shard)
    else:
        weights = concat_weights(
            [load(str(model_path / f"consolidated.{i:02d}.pth"), shard) for i in range(MODEL_PARAMS[model_size]["files"])],
            devices[0] if isinstance(devices, list) and devices else None
        )
else:
    weights = load(str(model_path), shard)

weights = convert_from_huggingface(weights, model, MODEL_PARAMS[model_size]["args"]["n_heads"], MODEL_PARAMS[model_size]["args"]["n_kv_heads"])
weights = fix_bf16(weights)

for i, device in enumerate(devices or [None]):
    with Context(device=device):
        load_state_dict(model, weights, strict=False, consume=False)  # consume=True if needed

return model

class TinygradDynamicShardInferenceEngine(InferenceEngine): def init(self, shard_downloader: ShardDownloader, devices=None): self.shard = None self.shard_downloader = shard_downloader self.executor = ThreadPoolExecutor(max_workers=1) self.devices = devices or [0] # Default to GPU 0 if no devices are provided

async def infer_prompt(self, request_id: str, shard: Shard, prompt: str, image_str: Optional[str] = None, inference_state: Optional[str] = None) -> Tuple[np.ndarray, str, bool]:
    await self.ensure_shard(shard)
    start_pos = json.loads(inference_state or "{}").get("start_pos", 0)
    n_captured_toks = json.loads(inference_state or "{}").get("n_captured_toks", 0)

    toks = await asyncio.get_event_loop().run_in_executor(self.executor, self.tokenizer.encode, prompt)
    h_results = []

    for device in self.devices:
        with Context(device=device):
            h = await asyncio.get_event_loop().run_in_executor(self.executor, lambda: self.model(Tensor([toks]), start_pos, TEMPERATURE).realize())
            h_results.append(h)

    h = self.aggregate_results(h_results)

    if h.shape == (1,):
        start_pos += len(toks) + 1
        n_captured_toks = 0
        return np.array([[h.item()]]), json.dumps({"start_pos": start_pos, "n_captured_toks": n_captured_toks}), h.item() == self.tokenizer.eos_token_id
    else:
        n_captured_toks = len(toks)
        return h.numpy(), json.dumps({"start_pos": start_pos, "n_captured_toks": n_captured_toks}), False

async def infer_tensor(self, request_id: str, shard: Shard, input_data: np.ndarray, inference_state: Optional[str] = None) -> Tuple[np.ndarray, str, bool]:
    await self.ensure_shard(shard)
    start_pos = json.loads(inference_state or "{}").get("start_pos", 0)
    n_captured_toks = json.loads(inference_state or "{}").get("n_captured_toks", 0)

    h_results = []
    for device in self.devices:
        with Context(device=device):
            h = await asyncio.get_event_loop().run_in_executor(self.executor, lambda: self.model(Tensor(input_data), start_pos, TEMPERATURE).realize())
            h_results.append(h)

    h = self.aggregate_results(h_results)

    if h.shape == (1,):
        start_pos += n_captured_toks + 1
        n_captured_toks = 0
        return np.array([[h.item()]]), json.dumps({"start_pos": start_pos, "n_captured_toks": n_captured_toks}), h.item() == self.tokenizer.eos_token_id
    else:
        return h.numpy(), json.dumps({"start_pos": start_pos, "n_captured_toks": n_captured_toks}), False

async def ensure_shard(self, shard: Shard):
    if self.shard == shard:
        return

    model_path = await self.shard_downloader.ensure_shard(shard)

    if self.shard != shard:
        self.model = await asyncio.get_event_loop().run_in_executor(
            self.executor, build_transformer, model_path, shard, "8B" if "8b" in shard.model_id.lower() else "70B", self.devices
        )

        tokenizer_path = str((model_path if model_path.is_dir() else model_path.parent))
        self.tokenizer = await resolve_tokenizer(tokenizer_path)
        self.shard = shard

def aggregate_results(self, results):
    # This function can be adjusted based on how aggregation should be handled. Currently, it returns the first result.
    return results[0]
udupicloud commented 1 week ago

@jorge123255 - Thanks for the update. I tried and couldn't get it going as I am beginner in this. If I can get the file, it would be of great help. Basically I need exo to fetch all the GPU's it has on those nodes. It can be 1 or n number of GPU's. I am building this for large models

jorge123255 commented 1 week ago

@udupicloud here i uploaded the changes on my github https://github.com/jorge123255/exo.git

svm87601 commented 2 days ago

After modifying the code according to your method, I encountered the following error: “Error: Failed to fetch completions: Error processing prompt (see logs with DEBUG>=2): HFShardDownloader.ensure_shard() missing 1 required positional argument: 'inference_engine_name'."

jorge123255 commented 1 day ago

Aaa let me fix that