Open austinbv opened 1 month ago
I really like this idea. @RayFernando1337 suggested something similar when we were running into issues with slow download for Llama 3.1 405b.
Lets use this as a design discussion.
Possible option libtorrent with Python bindings
Pros:
Cons:
Other option: qBittorrent-API
⚠️ Claude Sonnet 3.5 code refactoring recommendations:
To design expanded node capabilities for BitTorrent roles in the Exo system, we'll need to enhance the existing node structure to incorporate BitTorrent functionality. Here's a proposed design approach:
First, let's create a new class to manage BitTorrent operations:
import libtorrent as lt
from exo.inference.shard import Shard
class BitTorrentManager:
def __init__(self, save_path: str):
self.session = lt.session()
self.save_path = save_path
self.torrents = {}
async def add_torrent(self, torrent_file: str, shard: Shard):
info = lt.torrent_info(torrent_file)
handle = self.session.add_torrent({'ti': info, 'save_path': self.save_path})
self.torrents[shard] = handle
async def seed_model(self, model_path: str, shard: Shard):
# Create torrent file and start seeding
fs = lt.file_storage()
lt.add_files(fs, model_path)
t = lt.create_torrent(fs)
t.set_creator('exo_node')
lt.set_piece_hashes(t, self.save_path)
torrent_file = t.generate()
handle = self.session.add_torrent({'ti': torrent_file, 'save_path': self.save_path})
self.torrents[shard] = handle
async def download_model(self, torrent_file: str, shard: Shard):
await self.add_torrent(torrent_file, shard)
# Implement download progress tracking and completion check
def get_download_progress(self, shard: Shard) -> float:
if shard in self.torrents:
handle = self.torrents[shard]
return handle.status().progress
return 0.0
def is_download_complete(self, shard: Shard) -> bool:
if shard in self.torrents:
handle = self.torrents[shard]
return handle.status().is_seeding
return False
Now, let's modify the StandardNode
class to incorporate BitTorrent capabilities:
from exo.orchestration.standard_node import StandardNode
from exo.inference.shard import Shard
class BitTorrentEnabledNode(StandardNode):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.bt_manager = BitTorrentManager(save_path="/path/to/model/storage")
self.seeding_shards = set()
async def ensure_model(self, shard: Shard):
if shard in self.seeding_shards:
return # Model already available
# Check if any peer has the model
for peer in self.peers:
if await peer.has_model(shard):
torrent_file = await peer.get_torrent_file(shard)
await self.bt_manager.download_model(torrent_file, shard)
return
# If no peer has the model, become the seed node
await self.become_seed_node(shard)
async def become_seed_node(self, shard: Shard):
# Download model from HuggingFace or other source
model_path = await self.download_from_source(shard)
await self.bt_manager.seed_model(model_path, shard)
self.seeding_shards.add(shard)
async def download_from_source(self, shard: Shard) -> str:
# Implement logic to download model from HuggingFace or other source
pass
async def has_model(self, shard: Shard) -> bool:
return shard in self.seeding_shards
async def get_torrent_file(self, shard: Shard) -> str:
# Return the .torrent file for the requested shard
pass
async def process_prompt(self, shard: Shard, prompt: str, *args, **kwargs):
await self.ensure_model(shard)
return await super().process_prompt(shard, prompt, *args, **kwargs)
async def process_tensor(self, shard: Shard, tensor: np.ndarray, *args, **kwargs):
await self.ensure_model(shard)
return await super().process_tensor(shard, tensor, *args, **kwargs)
Update the PeerHandle
interface to include BitTorrent-related methods:
from abc import ABC, abstractmethod
class PeerHandle(ABC):
# ... existing methods ...
@abstractmethod
async def has_model(self, shard: Shard) -> bool:
pass
@abstractmethod
async def get_torrent_file(self, shard: Shard) -> str:
pass
Implement the new methods in the GRPCPeerHandle
class:
class GRPCPeerHandle(PeerHandle):
# ... existing methods ...
async def has_model(self, shard: Shard) -> bool:
request = node_service_pb2.HasModelRequest(shard=node_service_pb2.Shard(**shard.to_dict()))
response = await self.stub.HasModel(request)
return response.has_model
async def get_torrent_file(self, shard: Shard) -> str:
request = node_service_pb2.GetTorrentFileRequest(shard=node_service_pb2.Shard(**shard.to_dict()))
response = await self.stub.GetTorrentFile(request)
return response.torrent_file
Add new RPC methods to the node_service.proto
file:
service NodeService {
// ... existing methods ...
rpc HasModel (HasModelRequest) returns (HasModelResponse) {}
rpc GetTorrentFile (GetTorrentFileRequest) returns (GetTorrentFileResponse) {}
}
message HasModelRequest {
Shard shard = 1;
}
message HasModelResponse {
bool has_model = 1;
}
message GetTorrentFileRequest {
Shard shard = 1;
}
message GetTorrentFileResponse {
string torrent_file = 1;
}
This design expands the node capabilities to include BitTorrent roles:
Next steps would involve implementing these changes, thoroughly testing the new BitTorrent functionality, and integrating it with the existing node discovery and communication mechanisms.
Prototype how to simulate the BitTorrent flow:
⚠️ Claude Sonnet 3.5 code :
Let's create a basic prototype to demonstrate the BitTorrent distribution flow for Exo. This prototype will simulate the process of distributing a model file among multiple nodes. We'll use the libtorrent
library for BitTorrent functionality and asyncio
for asynchronous operations.
Here's a step-by-step prototype implementation:
import asyncio
import libtorrent as lt
import os
import tempfile
import time
from dataclasses import dataclass
@dataclass
class Shard:
model_id: str
start_layer: int
end_layer: int
n_layers: int
class Node:
def __init__(self, node_id: str, save_path: str):
self.node_id = node_id
self.save_path = save_path
self.session = lt.session()
self.torrents = {}
async def create_torrent(self, file_path: str, shard: Shard):
fs = lt.file_storage()
lt.add_files(fs, file_path)
t = lt.create_torrent(fs)
t.set_creator(f'exo_node_{self.node_id}')
lt.set_piece_hashes(t, os.path.dirname(file_path))
torrent_file = t.generate()
torrent_path = os.path.join(self.save_path, f"{shard.model_id}.torrent")
with open(torrent_path, "wb") as f:
f.write(lt.bencode(torrent_file))
return torrent_path
async def seed_model(self, file_path: str, shard: Shard):
torrent_file = await self.create_torrent(file_path, shard)
handle = self.session.add_torrent({'ti': lt.torrent_info(torrent_file), 'save_path': self.save_path})
self.torrents[shard.model_id] = handle
print(f"Node {self.node_id} is seeding {shard.model_id}")
async def download_model(self, torrent_file: str, shard: Shard):
handle = self.session.add_torrent({'ti': lt.torrent_info(torrent_file), 'save_path': self.save_path})
self.torrents[shard.model_id] = handle
print(f"Node {self.node_id} started downloading {shard.model_id}")
while not handle.is_seed():
s = handle.status()
print(f"Node {self.node_id} - {shard.model_id}: {s.progress:.2%} complete (down: {s.download_rate / 1000:.1f} kB/s up: {s.upload_rate / 1000:.1f} kB/s peers: {s.num_peers})")
await asyncio.sleep(1)
print(f"Node {self.node_id} completed downloading {shard.model_id}")
async def get_peers(self):
return [p.ip for handle in self.torrents.values() for p in handle.get_peer_info()]
async def simulate_distribution(num_nodes: int, model_size_mb: int):
temp_dir = tempfile.mkdtemp()
nodes = [Node(f"node_{i}", os.path.join(temp_dir, f"node_{i}")) for i in range(num_nodes)]
# Create a dummy model file
model_path = os.path.join(temp_dir, "model.bin")
with open(model_path, "wb") as f:
f.write(os.urandom(model_size_mb * 1024 * 1024))
shard = Shard("test_model", 0, 31, 32)
# Node 0 becomes the seed
await nodes[0].seed_model(model_path, shard)
# Other nodes download the model
torrent_file = os.path.join(nodes[0].save_path, f"{shard.model_id}.torrent")
download_tasks = [node.download_model(torrent_file, shard) for node in nodes[1:]]
await asyncio.gather(*download_tasks)
# Print final peer connections
for node in nodes:
peers = await node.get_peers()
print(f"Node {node.node_id} connected to peers: {peers}")
# Clean up
for node in nodes:
for handle in node.torrents.values():
node.session.remove_torrent(handle)
return nodes
# Run the simulation
async def main():
await simulate_distribution(num_nodes=5, model_size_mb=100)
if __name__ == "__main__":
asyncio.run(main())
This prototype demonstrates the following BitTorrent distribution flow:
Node
instances, each representing an Exo node.To run this prototype:
Install the required libraries:
pip install libtorrent
Save the code in a file, e.g., bittorrent_prototype.py
Run the prototype:
python bittorrent_prototype.py
This basic prototype demonstrates the core concept of using BitTorrent for model distribution in Exo. It shows how nodes can seed and leech model files, and how the distribution can happen in parallel among multiple nodes.
Next steps for integrating this into Exo would include:
Node
class to work with Exo's existing StandardNode
class.This prototype serves as a starting point to understand the basics of BitTorrent integration and can be expanded upon to meet Exo's specific requirements for model distribution.
@stephanj you’re going too much into implementation details. Turn the ai tool off for now and let’s try to think about what we actually want here at a high level
You might want to take a look at linp2p
from Protocol Labs. The Go version of that library is what powers IPFS. I looked into the code long time ago. From what I recall, libp2p uses a simplified for of Bittorrent protocol .
I think that library has a lot of decent APIs that would match your networking requirements
UPDATE: seems like they have not implemented Bittorrent in their python codebase as of now
I am opening an issue for discussion as well as a place holder for work where we use bittorrent to distribute large models amongst nodes.
Implementation Path
Seed node
✅ Pros