Open eswarthammana opened 7 months ago
i have the code to add ai and user messages to the OpenSearch this is just a sample still in exploration, where i am currently lack of auto session, cache polices etc, which i found you have better algo's to maintain.
from time import time
from typing import List, Optional
import json
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import (
BaseMessage,
message_to_dict,
messages_from_dict,
)
from opensearchpy import OpenSearch
from logs.logger import Log #my custom logger class
class OpenSearchChatMessageHistory(Log, BaseChatMessageHistory):
"""Chat message history that stores history in OpenSearch.
Args:
index (str): Name of the index to use.
session_id (str): Arbitrary key that is used to store the messages
of a single chat session.
opensearch_url (Optional[str]): URL of the OpenSearch instance to connect to.
Defaults to "http://localhost:9200".
ensure_ascii (Optional[bool]): Used to escape ASCII symbols in json.dumps.
Defaults to True.
"""
def __init__(
self,
index: str,
session_id: str,
opensearch_url: Optional[str] = "http://localhost:9200",
ensure_ascii: Optional[bool] = True,
) -> None:
super().__init__()
self.log_info("Initializing the OpenSearchChatMessageHistory class.")
self.index: str = index
self.session_id: str = session_id
self.ensure_ascii: bool = ensure_ascii
self.client: OpenSearch = OpenSearch([opensearch_url])
if self.client.indices.exists(index=index):
self.log_info(
f"Chat history index '{index}' already exists, skipping creation."
)
else:
self.log_info(f"Creating index '{index}' for storing chat history.")
self.client.indices.create(
index=index,
body={
"mappings": {
"properties": {
"session_id": {"type": "keyword"},
"created_at": {"type": "date"},
"history": {"type": "text"},
}
}
},
)
self.log_info("OpenSearchChatMessageHistory class initialized successfully.")
@property
def messages(self) -> List[BaseMessage]:
"""Retrieve the messages from OpenSearch."""
self.log_info("Loading messages from OpenSearch to buffer.")
result = self.client.search(
index=self.index,
body={
"query": {
"term": {
"session_id": self.session_id
}
}
},
sort="created_at:asc",
)
items = [
json.loads(document["_source"]["history"])
for document in result.get("hits", {}).get("hits", [])
] if result else []
self.log_info("Messages loaded from OpenSearch to buffer.")
return [messages_from_dict(item) for item in items]
def add_message(self, message: BaseMessage) -> None:
"""Add a message to the chat session in OpenSearch."""
self.log_info("Adding messages to OpenSearch.")
self.client.index(
index=self.index,
body={
"session_id": self.session_id,
"created_at": round(time() * 1000),
"history": json.dumps(
message_to_dict(message),
ensure_ascii=self.ensure_ascii,
),
},
refresh=True,
)
self.log_info("Messages added to OpenSearch.")
def clear(self) -> None:
"""Clear session memory in OpenSearch."""
self.log_info("Purging data in OpenSearch started.")
self.client.delete_by_query(
index=self.index,
body={
"query": {
"term": {
"session_id": self.session_id
}
}
},
refresh=True,
)
self.log_info("OpenSearch data purged.")
what are the modifications, i have to perform to use OpenSearch as a Cache for exact match and in case of semantic cache OpenSearch as a vector store and cache.
Thank you