AgentOps-AI / agentops

Python SDK for agent monitoring, LLM cost tracking, benchmarking, and more. Integrates with most LLMs and agent frameworks like CrewAI, Langchain, and Autogen
https://agentops.ai
MIT License
1.18k stars 93 forks source link

Notification System for AgentOps #253

Open sarath59 opened 3 weeks ago

sarath59 commented 3 weeks ago

I wanted this feature when I was using AgentOps so, I would love to propose to implement a notification system for AgentOps to enhance user awareness of critical events and improve monitoring capabilities. This system will send notifications (via email and SMS) for important triggers such as session starts and ends, significant LLM events, and cost thresholds being exceeded. Please let me know if this is feasible and practical for your company vision.

Objective: The objective is to keep users informed about the status and outcomes of their sessions and significant LLM activities, as well as to notify them when certain cost thresholds are exceeded. This will help users to monitor and react to changes or issues in real-time.

How We Want to Achieve It:

User Management: Allow users to register and provide their email addresses and phone numbers. Notification Preferences: Enable users to select which types of notifications they want to receive. Notification Manager: Implement a NotificationManager class to handle sending notifications.

Example and sample Code Snippets:

  1. NotificationManager Class

python Copy code import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from twilio.rest import Client as TwilioClient

class NotificationManager: def init(self, sendgrid_api_key=None, twilio_sid=None, twilio_auth_token=None): self.sendgrid_api_key = sendgrid_api_key self.twilio_client = TwilioClient(twilio_sid, twilio_auth_token) if twilio_sid and twilio_auth_token else None

def send_email(self, to_email, subject, content):
    message = MIMEMultipart()
    message['From'] = "noreply@agentops.ai"
    message['To'] = to_email
    message['Subject'] = subject

    message.attach(MIMEText(content, 'plain'))

    try:
        server = smtplib.SMTP('smtp.sendgrid.net', 587)
        server.starttls()
        server.login('apikey', self.sendgrid_api_key)
        server.sendmail("noreply@agentops.ai", to_email, message.as_string())
        server.quit()
    except Exception as e:
        print(f"Failed to send email: {e}")

def send_sms(self, to_number, content):
    if not self.twilio_client:
        print("Twilio client is not initialized.")
        return

    try:
        self.twilio_client.messages.create(
            body=content,
            from_='+1234567890',  # Replace with your Twilio number
            to=to_number
        )
    except Exception as e:
        print(f"Failed to send SMS: {e}")
  1. Client Modifications

python Copy code from .notification_manager import NotificationManager

... other imports ...

class Client(metaclass=MetaClient): def init(self, api_key: Optional[str] = None, parent_key: Optional[str] = None, endpoint: Optional[str] = None, max_wait_time: Optional[int] = None, max_queue_size: Optional[int] = None, tags: Optional[List[str]] = None, override: Optional[bool] = None, instrument_llm_calls=True, auto_start_session=False, inherited_session_id: Optional[str] = None, skip_auto_end_session: Optional[bool] = False):

... existing initialization code ...

    self.notification_manager = NotificationManager(
        sendgrid_api_key=os.getenv('SENDGRID_API_KEY'),
        twilio_sid=os.getenv('TWILIO_SID'),
        twilio_auth_token=os.getenv('TWILIO_AUTH_TOKEN')
    )

def start_session(self, tags: Optional[List[str]] = None, config: Optional[Configuration] = None, inherited_session_id: Optional[str] = None):
    # ... existing session start code ...
    self.notification_manager.send_email(
        to_email="user@example.com",
        subject="Session Started",
        content=f"Session {session_id} has started with tags: {tags}"
    )
    return self._session.session_id

def end_session(self, end_state: str, end_state_reason: Optional[str] = None, video: Optional[str] = None, is_auto_end: Optional[bool] = None):
    # ... existing session end code ...
    self.notification_manager.send_email(
        to_email="user@example.com",
        subject="Session Ended",
        content=f"Session {self._session.session_id} ended with state: {end_state} and reason: {end_state_reason}"
    )
    self._session = None
    self._worker = None

# ... other methods ...
  1. LLM Tracker Modifications

python Copy code

... existing imports ...

class LlmTracker:

... existing code ...

def _handle_response_v1_openai(self, response, kwargs, init_timestamp):
    # ... existing code ...
    try:
        self.llm_event.returns = response.model_dump()
        self.llm_event.agent_id = check_call_stack_for_agent_id()
        self.llm_event.prompt = kwargs["messages"]
        self.llm_event.prompt_tokens = response.usage.prompt_tokens
        self.llm_event.completion = response.choices[0].message.model_dump()
        self.llm_event.completion_tokens = response.usage.completion_tokens
        self.llm_event.model = response.model

        self.client.record(self.llm_event)
        self.client.notification_manager.send_email(
            to_email="admin@example.com",
            subject="LLM Event Recorded",
            content=f"LLM Event has been recorded: {self.llm_event.completion}"
        )
    except Exception as e:
        self.client.record(ErrorEvent(trigger_event=self.llm_event, exception=e))
        kwargs_str = pprint.pformat(kwargs)
        response = pprint.pformat(response)
        logger.warning(f"Unable to parse response for LLM call. Skipping upload to AgentOps\nresponse:\n {response}\nkwargs:\n {kwargs_str}\n")

    return response

# ... similar updates for other _handle_response methods ...

Types of Triggers and Notifications:

Session Start: Notify when a session starts. Session End: Notify when a session ends with details such as end state and reason. LLM Event: Notify when significant LLM events are recorded. Cost Exceed Threshold: Notify when the cost of a session or LLM usage exceeds predefined thresholds (e.g., OpenAI or AgentOps costs). Data Sent in Notifications:

Session Notifications: Session ID, start/end times, tags, end state, and reasons. LLM Event Notifications: Details of the LLM event, including the model used and the completion message. Cost Notifications: The cost incurred and the threshold that was exceeded. This implementation provides a robust way to keep users informed about important events and costs in real-time, improving overall monitoring and responsiveness.

Pull Request Description: When creating a pull request, you can use the following summary:

Summary of Changes Enhancements to AgentOps with Notification System

Overview We have introduced a notification system into the AgentOps framework to improve user awareness of critical events. This feature sends email notifications for session starts and ends, significant LLM events, and cost thresholds being exceeded.

Changes Made: Notification Manager: Created a notification_manager.py file that handles email and SMS notifications using SendGrid and Twilio. Client Enhancements: Integrated the NotificationManager into the Client class. Added notifications for session starts and ends. LLM Tracker Enhancements: Modified LLMTracker to send notifications when LLM events are recorded. Benefits: Improved Monitoring: Users can now receive immediate notifications for important events, enhancing the ability to monitor and react to changes or issues in real-time. Enhanced Awareness: Keeps users informed about session status and key LLM activities without having to manually check logs or dashboards. Testing: To test these changes, the following script can be used:

python Copy code from agentops import Client import asyncio

Initialize the AgentOps client with your API key

client = Client(api_key="your_api_key")

Start a session

session_id = client.start_session(tags=["notification_test"])

try:

Synchronous chat call to LLM API (example with OpenAI)

sync_response = openai.ChatCompletion.create(
    model='gpt-3.5-turbo',
    messages=[{'role': 'user', 'content': 'Why is the sky blue?'}]
)
print("Sync Response:", sync_response)

# Asynchronous chat call to LLM API
async def async_chat():
    async for chunk in openai.ChatCompletion.acreate(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": "Why is the sky blue?"}],
        stream=True,
    ):
        print("Async Chunk:", chunk)

asyncio.run(async_chat())

# End the session successfully
client.end_session("Success")

except Exception as e:

End the session with failure if there is an exception

client.end_session("Fail", str(e))
raise

This script initializes the client, starts a session, performs synchronous and asynchronous LLM calls, and ends the session, triggering notifications at each stage.

sarath59 commented 3 weeks ago

Let me know if this falls under your product vision and aligns with your feature roadmap .