exo-explore / exo

Run your own AI cluster at home with everyday devices 📱💻 🖥️⌚
GNU General Public License v3.0
6.3k stars 322 forks source link

[BOUNTY - $200] Share kv cache between nodes for redundancy #52

Open AlexCheema opened 1 month ago

AlexCheema commented 1 month ago

https://github.com/exo-explore/exo/issues/23#issuecomment-2241521048

Perhaps after each inference, we synchronise the full kv cache between all nodes. This should be fairly straightforward, we can broadcast the entire cache.

this would allow for saving context even when a node goes down.

dhruvmalik007 commented 1 month ago

Hi Alex . just was curious as I was currently learning about CRDT strategies and indeed wanted to understand the challenge on broadcasting the cache changes with either:

  1. Via strong consistency (I.e doing the mutex mechanism for sequentially sharing the corresponding KVCache to each entities sequentially ).

  2. Via Eventual consistency : by doing the updates independently and then implementing strategies like last write wins or other parameters.

  3. Strong and eventual consistency : implementing both techniques by creating the tuple between the KVcache updates with the corresponding timestamps.

so is that a good strategy to think about the KVCacahe updates?

AlexCheema commented 1 month ago

I think eventual consistency is preferred here. Just broadcast them out optimistically, with a monotonically increasing version number and pick max(current_kv_cache.version, version)

dhruvmalik007 commented 1 month ago

sounds good . I am happy to get assigned to this and create PR if its public issue .

AlexCheema commented 1 month ago

sounds good . I am happy to get assigned to this and create PR if its public issue .

Great! I will actually assign a $200 bounty to this as it’s an important upgrade and highly valued contribution if you can get it working reliably!

dhruvmalik007 commented 1 month ago

Thanks . I will try to setup PR till Wednesday due to limited time BW tommorrow . Just wanted to ask whether the gRPCPeerHandler sendMessage function will work as the endpoint for broadcasting ?

stephanj commented 1 month ago

More context info for people who want to understand why, how etc. provided by Claude :)

Synchronizing the full KV cache between all nodes after each inference could indeed provide some unique benefits, particularly in terms of fault tolerance and maintaining context across the cluster. Let's explore this idea and consider its implications:

Benefits:

  1. Fault Tolerance: As you mentioned, this would allow for preserving context even if a node goes down. Any node could potentially take over the work of a failed node without losing the context.

  2. Load Balancing: It could potentially allow for more flexible load balancing, as any node would have the full context and could theoretically handle the next request.

  3. Consistent State: It ensures that all nodes have a consistent view of the conversation state, which could be beneficial for debugging and monitoring.

Implementation Considerations:

  1. Cache Broadcast Mechanism: We'd need to implement a efficient broadcast mechanism. This could be done using the existing gRPC infrastructure:

    async def broadcast_kv_cache(self, kv_cache):
       async def send_cache_to_peer(peer):
           try:
               await asyncio.wait_for(peer.send_kv_cache(kv_cache), timeout=15.0)
           except asyncio.TimeoutError:
               print(f"Timeout broadcasting KV cache to {peer.id()}")
           except Exception as e:
               print(f"Error broadcasting KV cache to {peer.id()}: {e}")
    
       await asyncio.gather(*[send_cache_to_peer(peer) for peer in self.peers], return_exceptions=True)
  2. Cache Reception and Update: Each node would need a method to receive and update its local KV cache:

    async def update_kv_cache(self, new_kv_cache):
       # Logic to update the local KV cache
       pass
  3. Synchronization Point: We'd need to decide when exactly to synchronize. After each token generation might be too frequent and could introduce latency. After each full response might be more appropriate:

    async def process_prompt(self, base_shard: Shard, prompt: str, request_id: Optional[str] = None, inference_state: Optional[str] = None) -> Optional[np.ndarray]:
       resp = await self._process_prompt(base_shard, prompt, request_id, inference_state)
       kv_cache = self.get_current_kv_cache()  # Method to get the current KV cache
       await self.broadcast_kv_cache(kv_cache)
       return resp

Challenges and Considerations:

  1. Network Overhead: Broadcasting the entire KV cache after each inference could introduce significant network overhead, especially for larger models or longer conversations.

  2. Synchronization Delay: The time taken to broadcast and update the KV cache on all nodes could introduce latency in the system.

  3. Cache Size: For very large models or long conversations, the KV cache could become quite large, potentially causing memory issues on nodes with less available RAM.

  4. Consistency Management: We'd need to ensure that the cache remains consistent across all nodes, even in the face of network delays or failures.

  5. API Compatibility: This approach differs from how most LLM APIs work, which typically don't maintain state between requests. We'd need to consider how to make this transparent to API users or whether to expose it as a feature.

Potential Optimizations:

  1. Differential Updates: Instead of broadcasting the entire cache each time, we could send only the changes since the last synchronization.

  2. Compression: Implement compression for the KV cache before transmission to reduce network overhead.

  3. Asynchronous Updates: Perform the cache broadcast asynchronously, allowing the node to continue processing while the update happens in the background.

  4. Configurable Behavior: Allow users to choose whether they want this "stateful" behavior or a more traditional stateless API.

Implementation Steps:

  1. Modify the InferenceEngine interface and implementations to expose methods for getting and setting the KV cache.

  2. Implement the broadcast and update mechanisms in the StandardNode class.

  3. Modify the processing methods (process_prompt, process_tensor) to trigger cache synchronization at appropriate points.

  4. Update the PeerHandle interface and implementations to include methods for sending and receiving KV cache updates.

  5. Implement error handling and recovery mechanisms to deal with synchronization failures.

  6. Add configuration options to enable/disable this feature and control its behavior (e.g., synchronization frequency).

While this approach could offer some unique advantages, it's important to carefully consider the trade-offs, particularly in terms of performance and complexity. It might be worth implementing this as an optional feature that can be enabled for specific use cases where maintaining context across the cluster is particularly valuable.

dhruvmalik007 commented 1 month ago

More context info for people who want to understand why, how etc. provided by Claude :)

Synchronizing the full KV cache between all nodes after each inference could indeed provide some unique benefits, particularly in terms of fault tolerance and maintaining context across the cluster. Let's explore this idea and consider its implications:

Benefits:

  1. Fault Tolerance: As you mentioned, this would allow for preserving context even if a node goes down. Any node could potentially take over the work of a failed node without losing the context.
  2. Load Balancing: It could potentially allow for more flexible load balancing, as any node would have the full context and could theoretically handle the next request.
  3. Consistent State: It ensures that all nodes have a consistent view of the conversation state, which could be beneficial for debugging and monitoring.

Implementation Considerations:

  1. Cache Broadcast Mechanism: We'd need to implement a efficient broadcast mechanism. This could be done using the existing gRPC infrastructure:

    async def broadcast_kv_cache(self, kv_cache):
       async def send_cache_to_peer(peer):
           try:
               await asyncio.wait_for(peer.send_kv_cache(kv_cache), timeout=15.0)
           except asyncio.TimeoutError:
               print(f"Timeout broadcasting KV cache to {peer.id()}")
           except Exception as e:
               print(f"Error broadcasting KV cache to {peer.id()}: {e}")
    
       await asyncio.gather(*[send_cache_to_peer(peer) for peer in self.peers], return_exceptions=True)
  2. Cache Reception and Update: Each node would need a method to receive and update its local KV cache:
    async def update_kv_cache(self, new_kv_cache):
       # Logic to update the local KV cache
       pass
  3. Synchronization Point: We'd need to decide when exactly to synchronize. After each token generation might be too frequent and could introduce latency. After each full response might be more appropriate:
    async def process_prompt(self, base_shard: Shard, prompt: str, request_id: Optional[str] = None, inference_state: Optional[str] = None) -> Optional[np.ndarray]:
       resp = await self._process_prompt(base_shard, prompt, request_id, inference_state)
       kv_cache = self.get_current_kv_cache()  # Method to get the current KV cache
       await self.broadcast_kv_cache(kv_cache)
       return resp

Challenges and Considerations:

  1. Network Overhead: Broadcasting the entire KV cache after each inference could introduce significant network overhead, especially for larger models or longer conversations.
  2. Synchronization Delay: The time taken to broadcast and update the KV cache on all nodes could introduce latency in the system.
  3. Cache Size: For very large models or long conversations, the KV cache could become quite large, potentially causing memory issues on nodes with less available RAM.
  4. Consistency Management: We'd need to ensure that the cache remains consistent across all nodes, even in the face of network delays or failures.
  5. API Compatibility: This approach differs from how most LLM APIs work, which typically don't maintain state between requests. We'd need to consider how to make this transparent to API users or whether to expose it as a feature.

Potential Optimizations:

  1. Differential Updates: Instead of broadcasting the entire cache each time, we could send only the changes since the last synchronization.
  2. Compression: Implement compression for the KV cache before transmission to reduce network overhead.
  3. Asynchronous Updates: Perform the cache broadcast asynchronously, allowing the node to continue processing while the update happens in the background.
  4. Configurable Behavior: Allow users to choose whether they want this "stateful" behavior or a more traditional stateless API.

Implementation Steps:

  1. Modify the InferenceEngine interface and implementations to expose methods for getting and setting the KV cache.
  2. Implement the broadcast and update mechanisms in the StandardNode class.
  3. Modify the processing methods (process_prompt, process_tensor) to trigger cache synchronization at appropriate points.
  4. Update the PeerHandle interface and implementations to include methods for sending and receiving KV cache updates.
  5. Implement error handling and recovery mechanisms to deal with synchronization failures.
  6. Add configuration options to enable/disable this feature and control its behavior (e.g., synchronization frequency).

While this approach could offer some unique advantages, it's important to carefully consider the trade-offs, particularly in terms of performance and complexity. It might be worth implementing this as an optional feature that can be enabled for specific use cases where maintaining context across the cluster is particularly valuable.

Thanks @stephanj for adding context along with sharing the checklists of the subject . currently it's work in progress here .

I am gonna implement modified version of gossip protocol for providing strong consistency .

I do have some general feedback of the viewpoints that you've shared:

pranav4501 commented 2 weeks ago

Hi, I noticed this bounty issue has been open for about a month without recent activity. I'm interested in working on this task. Before I start, I wanted to check.

dhruvmalik007 commented 2 weeks ago

Hi @pranav4501 , yes I had starting working on this 2 weeks but didnt had time in between : https://github.com/dhruvmalik007/exo/tree/feat/adding_KV_broadcast_cache

I will try till end of this week but if you are interested to implement before I can assign you.

pranav4501 commented 2 weeks ago

Hi @dhruvmalik007 , I was just checking if someone was still working on it. I will look at other issues. Thank you.