coreweave / tensorizer

Module, Model, and Tensor Serialization/Deserialization
MIT License
155 stars 25 forks source link

Tensorizer Support for Large Models( 70B+) that dont fit into a single GPU #81

Open SujoyDutta opened 5 months ago

SujoyDutta commented 5 months ago

I'm currently evaluating Tensorizer for handling large models, specifically models with parameters as larger than 70B that cannot be fit into a single GPU.

I have a few questions and concerns regarding Tensorizer's support for such large models and its handling of .tensors files.

  1. Large Model Support: Can Tensorizer effectively handle large models with parameters in the range of 70B and above wiht multiple distributed GPUs ? I havent seen any benchmarks for streaming large models that cannot be fit into a single A100 GPU . Specifically, can Tensorizer scale effectively to accommodate large models across distributed environments, such as distributed training setups with multiple GPUs or nodes ?

  2. .tensors File Format: Could you provide some insights into the .tensors file format used by Tensorizer? How does Tensorizer organize and store tensors within .tensors files, especially for large models? Does it employ any sharding or distribution mechanisms for managing large tensors?

I appreciate any insights or documentation you can provide regarding these questions. Understanding Tensorizer's capabilities and limitations will help us make informed decisions for our use case.

dmarx commented 5 months ago
  1. Absolutely! One strategy you can use is to partition the model into per-rank shards and serialize each shard into its own respective .tensors file. Here's an example of what checkpointing with tensorizer could look like for distributed training, simplified from the coreweave/megatron fork
model = unwrap_model(model)
for i in range(len(model)):
    mpu.set_virtual_pipeline_model_parallel_rank(i)
    shard_serializer = TensorSerializer(
        f"{checkpoint_name}_shard{i}.tensors"
    )
    shard_serializer.write_module(model[i])
    shard_serializer.close()

And the corresponding deserialization, again simplified from the linked code:

for i in range(len(model)):
    mpu.set_virtual_pipeline_model_parallel_rank(i)
    shard_deserializer = TensorDeserializer(
        f"{checkpoint_name}_shard{i}.tensors",
        device=torch.cuda.current_device(),
        plaid_mode=True
    )
    shard_deserializer.load_into_module(model[i])
    shard_deserializer.close()

Alternatively, if you have a checkpoint already that you want to load across multiple ranks, you can provide a filter function to indicate which tensors should or should not be streamed into any given rank. EDIT: I just learned this is actually a pretty inefficient approach. If you have sufficient RAM, the lowest latency approach will be to deserialize the entire checkpoint onto your CPU, then assign tensors to devices as you normally would from there.

# EDIT: Actually, don't do this. leaving it here for demonstrative purposes.

for i in range(len(model)):
    module_names = modules_to_load_into_rank[i] 
    # Should evaluate to True when the provided `key`
    # matches the name of a tensor we want on the device
    def filter_func(key: str) -> Callable[[str], bool]:
        return key in module_names

    mpu.set_virtual_pipeline_model_parallel_rank(i)
    shard_deserializer = TensorDeserializer(
        f"{checkpoint_name}_shard{i}.tensors",
        device=torch.cuda.current_device(),
        plaid_mode=True
    )
    shard_deserializer.load_into_module(model[i], filter_func)
    shard_deserializer.close()
  1. .tensors files pack tensors into contiguous memory on disk. The chunk for a given tensor starts with a header containing metadata, and then is followed by the tensor's data. Main implementation details can be found here The data format doesn't do sharding or distribution on its own, but as demonstrated above is compatible with user-specified sharding. Alternatively, Coreweave storage supports both CephFS and Vast distributed file systems. We are currently undergoing benchmarking exercises whose results we will hopefully share soon. Based on our preliminary observations, reading and writing shards distributed across multiple .tensors files is likely to be faster than one big file when using one of those distributed file systems.

Did that answer your questions?

SujoyDutta commented 5 months ago

Thanks @dmarx for replying quickly. I'll try the suggestion of partitioning the model into per-rank shards for checkpointing. Two questions:

  1. Regarding Serialization and Deserialization with Multiple GPUs: If I have more GPUs during serialization and write the shards, but fewer GPUs during deserialization, would it impact the deserialization process? Does Tensorizer handle this scenario efficiently, or are there any considerations I should be aware of?

  2. Observing Slow Deserialization Speeds: I tried serializing and deserializing the same exact example using Tensorizer with the model EleutherAI/gpt-j-6B . Despite, our fast S3 download speeds of 10GB/s and the 12GB .tensors file, the deserialization process was slow, with a transfer rate of 477 MB/s and a 26-second loading time to GPU ( these numbers are from total_bytes_str and get_mem_usage() when executing the example). Given that network speed isn't a bottleneck, could there be other processes causing this delay, such as intermediate disk writes or aggregation happening on the GPU side to stitch model weights?

dmarx commented 5 months ago

could you maybe share some code to clarify how you're deserializing? just to reiterate: I provided three chunks of code above, and one of them (the last one) is a demonstration of what not to do.

also, could you elaborate a bit more on your environment, e.g. are you downloading from S3 within AWS? If you are pulling this data over a WAN connection (which is probably the case if you are doing all of this within AWS) that is potentially a contributor to the poor performance.

SujoyDutta commented 5 months ago

Thank you @dmarx for your prompt response.

For deserialization, I'm employing the same code structure as provided in the deserialize example. Initially, I write the .tensors file for the 6B model, and subsequently, I utilize the boto3 TensorDeserializer to stream it back onto the GPU.

Environment Details:

Our environment is internal, not AWS-based. However, during experimentation, I observed that while using the MinIO command-line utility mc to download the .tensors file, downloading to an NVMe(not disk) I achieved speeds of at least 3GB/s, indicating that our network can indeed support higher speeds.

Despite the network's capability to achieve high speeds, the TensorDeserializer only achieves transfer speeds in the range of 500 MB/s during deserialization. Given the confirmed high download speeds, I'm curious about the factors limiting the deserialization speeds with Tensorizer.

I hope this provides the necessary clarity on the deserialization process and environment setup. Any insights for speeding up or recommendations regarding the observed deserialization speeds would be greatly appreciated.

dmarx commented 5 months ago

What version of tensorizer are you using? Try updating to the latest version, 2.8.0. One of the most important performance modes in tensorizer is plaid_mode which pipelines the loading and processing of tensors. This has historically been a feature that was disabled by default, and will now be turned on by default whenever a cuda device is detected.

SujoyDutta commented 5 months ago

For some reason when I try to download via pip install tensorizer it defaults to 2.7.2 But I tried both with and without plaid_mode = True in my experiments and the model was streamed to GPU in almost same time for both cases

dmarx commented 5 months ago

This is really unusual. I'm thankful you've brought your case to our attention and for your patience and cooperation as we try to figure out what could be going on here.

To update the library, try pip install --upgrade tensorizer or pip install tensorizer==2.8.0.

What kind of performance do you experience using tensorizer to deserialize models from local paths?

dmarx commented 5 months ago

also, could you possibly share a little about the hardware you're using here? GPU, CPU, # cores, RAM, etc.

SujoyDutta commented 5 months ago

Yea sure, Thanks so much for taking a look into this.

I executed the helper utils from tensorizer

total_bytes_str = convert_bytes(deserializer.total_tensor_bytes)
duration = end - start
per_second = convert_bytes(deserializer.total_tensor_bytes / duration)
after_mem = get_mem_usage()
deserializer.close()
print(f"Deserialized {total_bytes_str} in {end - start:0.2f}s, {per_second}/s")
print(f"Memory usage before: {before_mem}")
print(f"Memory usage after: {after_mem}")

and got the following output

Deserialized 12.2 GB in 26.35s, 463.8 MB/s
Memory usage before: CPU: (maxrss: 14,341MiB F: 486,836MiB) GPU: (U: 309MiB F: 32,191MiB T: 32,501MiB) TORCH: (R: 0MiB/0MiB, A: 0MiB/0MiB)
Memory usage after: CPU: (maxrss: 26,839MiB F: 486,739MiB) GPU: (U: 23,749MiB F: 8,751MiB T: 32,501MiB) TORCH: (R: 23,348MiB/32,014MiB, A: 11,661MiB/23,315MiB)

I have 12 core CPU and for GPU I am using V100 but I didnt get much difference when using A100 either. Both cases the speed was ~ 500 MB/s . I am having some issues internally installing latest version 2.8.0. I'll ask around and see whats the issue . But I was setting plaid_mode= True with 2.7.2.

Local mode yea local mode was very fast in my experiment from disk it reached speed upto 3GB/s

dmarx commented 4 months ago

Did you ever figure out what was going on here? The team has incorporated several performance improvements over the last month: although I wasn't able to reproduce your issues, it's possible that they were addressed in one of the updates. If you want to experiment with the pre-release package: pip install --pre 'tensorizer~=2.9.0'

SujoyDutta commented 32 minutes ago

Thanks @dmarx I will test the latest one and get back