Closed WhyMEliu closed 5 months ago
Hey there, @WhyMEliu! Fancy seeing you around these parts again. Hope all is well on your end! 🦙💻
To make HTTP POST requests to your LLM service and Embedding using LlamaIndex, you can utilize the CustomLLM
class. This class has two methods, _complete
and _stream_complete
, which you can override to handle the specifics of making HTTP POST requests to your LLM service.
Here's an example of how you might implement these methods to make HTTP POST requests using the requests
library. This example assumes your LLM service accepts JSON payloads and returns JSON responses.
First, ensure you have the requests
library installed:
pip install requests
Then, modify the CustomLLM
class by adding the _complete
and _stream_complete
methods:
import requests
from typing import Any, Dict
class CustomLLM(LLM):
def __init__(self, service_url: str):
self.service_url = service_url
def _complete(self, prompt: str, formatted: bool = False, **kwargs: Any) -> CompletionResponse:
# Construct the payload for the POST request
payload = {
"prompt": prompt,
"formatted": formatted,
**kwargs
}
# Make the HTTP POST request
response = requests.post(self.service_url, json=payload)
# Ensure the request was successful
response.raise_for_status()
# Parse the JSON response
data = response.json()
# Convert the response to a CompletionResponse (you may need to adjust this based on your service's response structure)
return CompletionResponse(
completions=data.get("completions"),
metadata=data.get("metadata")
)
def _stream_complete(self, prompt: str, formatted: bool = False, **kwargs: Any) -> CompletionResponseAsyncGen:
# This example assumes streaming is handled via successive POST requests; adjust based on your actual streaming implementation
# For real streaming support, you might need a different approach, possibly involving websockets or another streaming protocol
raise NotImplementedError("Streaming completions are not implemented in this example")
# Implement the `metadata` method as required by the abstract base class
def metadata(self) -> Dict[str, Any]:
return {
"service_url": self.service_url
}
This example provides a basic implementation for making synchronous HTTP POST requests to an LLM service using the _complete
method. The _stream_complete
method is left unimplemented because streaming behavior can vary significantly between services, and some services might not support streaming completions via HTTP POST. You would need to adjust these implementations based on the specifics of your LLM service, including how it expects requests to be formatted and how it returns responses.
You can find more information about the CustomLLM
class in the llama-index-core/llama_index/core/llms/custom.py file.
Additionally, LlamaIndex does provide built-in support for making HTTP requests. This is evident from the llama_index/tools/requests/base.py
file, which shows a RequestsToolSpec
class that wraps the functionality of the requests
library for making GET, POST, and PATCH requests. This class is part of the LlamaIndex framework's integrations/tools, indicating that users do not need to use an external library like requests
directly for these operations, as the framework already includes this capability. You can find more about this in the llama-index-integrations/tools/llama-index-tools-requests/llama_index/tools/requests/base.py file.
Please note that this is a basic example and you might need to adjust it based on the specifics of your LLM service and Embedding.
@WhyMEliu implement a custom llm and custom embedding model that wraps your API requests https://docs.llamaindex.ai/en/stable/examples/embeddings/custom_embeddings.html https://docs.llamaindex.ai/en/stable/module_guides/models/llms/usage_custom.html#using-custom-llm-advanced
Thank you very much for your help.
@WhyMEliu implement a custom llm and custom embedding model that wraps your API requests https://docs.llamaindex.ai/en/stable/examples/embeddings/custom_embeddings.html https://docs.llamaindex.ai/en/stable/module_guides/models/llms/usage_custom.html#using-custom-llm-advanced
Hi Logan @logan-markewich , Sorry to open this issue again.
If I build custom LLM from this way, and I use this LLM with response_synthesizer: tree summarize, How can I make this LLM can be called by tree summarize async?
@xjw1001001 you'd have to define the async method in the custom LLM (check out the base class for what I mean)
async def acomplete()
async def achat()
Etc.
For these methods to be truly async (and not fake async) there should be some await used in these methods
@logan-markewich Hi Logan, thank you for your advice. Here is my implementation for reference
class Claude3_Sonnet(CustomLLM):
context_window: int = 180000
num_output: int = 4000
model_name: str = "custom"
dummy_response: str = "My response"
system_message: str = "You are a helpful assistant"
api_key: str
@property
def metadata(self) -> LLMMetadata:
"""Get LLM metadata."""
return LLMMetadata(
context_window=self.context_window,
num_output=self.num_output,
model_name=self.model_name,
)
@llm_completion_callback()
def complete(self, prompt: str, system_message: str ="None", **kwargs: Any) -> CompletionResponse:
api_url = "https://?????/gpt/v2/claude-3-sonnet-20240229-v1"
if not self.api_key:
raise ValueError("API key is not set.")
if system_message== "None":
system_message = self.system_message
headers = {"X-APIKey": self.api_key}
payload = {
"max_tokens": self.num_output,
"messages": [{"role": "user", "content": prompt}],
"system": system_message,
"anthropic_version": "bedrock-2023-05-31"
}
response = requests.post(api_url, headers=headers, json=payload)
if response.status_code == 200:
response_text = response.json().get("content", [{}])[0].get("text", "")
return CompletionResponse(text=response_text)
else:
return CompletionResponse(text=f"Error: {response.status_code} - {response.reason}")
@llm_completion_callback()
def stream_complete(self, prompt: str, **kwargs: Any) -> CompletionResponseGen:
raise NotImplementedError("This method has not been implemented yet.")
@llm_completion_callback()
async def acomplete(self, prompt: str, system_message: str = "None", **kwargs: Any) -> CompletionResponse:
api_url = "https://???????/gpt/v2/claude-3-sonnet-20240229-v1"
if not self.api_key:
raise ValueError("API key is not set.")
if system_message == "None":
system_message = self.system_message
headers = {"X-APIKey": self.api_key}
payload = {
"max_tokens": self.num_output,
"messages": [{"role": "user", "content": prompt}],
"system": system_message,
"anthropic_version": "bedrock-2023-05-31"
}
async with httpx.AsyncClient() as client:
response = await client.post(api_url, headers=headers, json=payload)
if response.status_code == 200:
response_text = response.json().get("content", [{}])[0].get("text", "")
return CompletionResponse(text=response_text)
else:
# Use httpx.codes to get a textual reason for the status code
reason_phrase = httpx.codes.get_reason_phrase(response.status_code)
return CompletionResponse(text=f"Error: {response.status_code} - {reason_phrase}")
@logan-markewich
Update: Need to add retry and max time for the code:
@llm_completion_callback()
async def acomplete(self, prompt: str, system_message: str = "None", **kwargs: Any) -> CompletionResponse:
api_url = "https://???????/gpt/v2/claude-3-sonnet-20240229-v1"
if not self.api_key:
raise ValueError("API key is not set.")
if system_message == "None":
system_message = self.system_message
headers = {"X-APIKey": self.api_key}
payload = {
"max_tokens": self.num_output,
"messages": [{"role": "user", "content": prompt}],
"system": system_message,
"anthropic_version": "bedrock-2023-05-31"
}
# Define an inner async function for making the HTTP request with retries and timeout
@backoff.on_exception(backoff.expo,
httpx.ReadTimeout,
max_tries=3)
async def make_request_with_retries():
# Set a custom timeout (e.g., connect=5.0 seconds, read=10.0 seconds)
timeout = httpx.Timeout(120)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(api_url, headers=headers, json=payload)
return response
# Call the inner function
response = await make_request_with_retries()
if response.status_code == 200:
response_text = response.json().get("content", [{}])[0].get("text", "")
return CompletionResponse(text=response_text)
else:
# Use httpx.codes to get a textual reason for the status code
reason_phrase = httpx.codes.get_reason_phrase(response.status_code)
return CompletionResponse(text=f"Error: {response.status_code} - {reason_phrase}")
Question Validation
Question
I have an LLM service that exposes only http post requests. The same goes for Embedding. How do I use llamaindex to call them?