Open GillesJ opened 2 months ago
I am trying to use Locust and establish a Websocket connection. So far I am not successful but maybe that's a hint for you.
We ended up with a webdriving solution: an https://www.artillery.io free account with Playwright scripts (in TypeScript) for driving the Chainlit UI.
I still would prefer a non-browser-automation, Python-based native solution, but this setup wasn't too painful.
@hmrc87 I managed to test my chainlit application using Locust, and I thought I’d share a general version of my script in case it’s helpful for you:
import time
import uuid
import json
from gevent.event import Event
import socketio
from locust import task, between, User, events
# Replace with your server URL
SERVER_URL = "http://your-server-url"
class SocketIOLocustUser(User):
wait_time = between(1, 3) # Wait time between tasks
def __init__(self, environment):
super().__init__(environment)
self.sio = socketio.Client(request_timeout=60, engineio_logger=False)
self.environment = environment
self.handshake_event = Event()
self.response_event = Event()
self.task_end_count = 0
# Set up event listeners for Socket.IO events
self.sio.on("connect", self.connect)
self.sio.on("connect_error", self.connect_error)
self.sio.on("disconnect", self.disconnect)
self.sio.on("new_message", self.new_message)
self.sio.on("task_end", self.task_end)
def connect(self):
# Record connection success metrics
connection_latency = (time.time() - self.connection_start_time) * 1000
self.record_metrics("connect", connection_latency, success=True)
print("Connected to server!")
def connect_error(self, data):
# Record connection error metrics
connection_latency = (time.time() - self.connection_start_time) * 1000
self.record_metrics("connect_error", connection_latency, success=False, exception=data)
print(f"Connection failed: {data}")
def disconnect(self):
print("Disconnected from server.")
def on_start(self):
headers = {
"X-Session-Id": str(uuid.uuid4()), # Unique session ID per user
"X-Client-Type": "test-client"
}
self.connection_start_time = time.time() # Track connection start time
self.sio.connect(
url=SERVER_URL,
headers=headers,
transports=["websocket"],
wait_timeout=10,
)
self.wait_for_handshake()
def new_message(self, message):
if message.get("name") == "Assistant":
print("Received response from Assistant")
self.response_event.set() # Signal that a response was received
def task_end(self, message):
self.task_end_count += 1
if self.task_end_count == 2:
self.handshake_event.set() # Signal the end of the handshake
print(f"Task end count: {self.task_end_count}")
def wait_for_handshake(self):
handshake_start_time = time.time()
self.handshake_event.wait()
latency = (time.time() - handshake_start_time) * 1000
self.record_metrics("handshake_latency", latency)
@task
def send_user_question(self):
self.send_user_question_start_time = time.time()
try:
self.sio.emit("client_message", self.build_user_question_payload())
self.response_event.wait() # Wait for response
latency = (time.time() - self.send_user_question_start_time) * 1000
self.record_metrics("send_user_question", latency, success=True)
self.response_event.clear()
except Exception as e:
latency = (time.time() - self.send_user_question_start_time) * 1000
self.record_metrics("send_user_question", latency, success=False, exception=e)
def build_user_question_payload(self):
# Example payload for a user question
return {
"message": {
"id": str(uuid.uuid4()),
"name": "User",
"type": "user_message",
"output": "How can you help me",
"createdAt": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
},
"fileReferences": []
}
def record_metrics(self, name, response_time, success=True, exception=None):
# Record custom metrics in Locust
self.environment.events.request.fire(
request_type="Socket.IO",
name=name,
response_time=response_time,
response_length=0,
exception=exception if not success else None,
)
def on_stop(self):
self.sio.disconnect()
This script establishes a WebSocket connection to a Socket.IO server and performs load testing with Locust, recording metrics for connection latency, response times, and errors.
@dokterbob maybe something that we could add to the docs?
Hi @ShubhamMaddhashiya-bidgely , I am trying to use your script to test my chainlit app, but I'm facing the following error:
[2024-11-11 11:53:29,479] vagrant-ubuntu-trusty-64/ERROR/locust.user.users: Unexpected response from server Traceback (most recent call last): File "/home/vagrant/miniconda3/envs/chatbot/lib/python3.9/site-packages/locust/user/users.py", line 148, in run self.on_start() File "/home/vagrant/hcsc-gov-ai/src/gov_ai/test.py", line 58, in on_start self.connect_to_socketio() File "/home/vagrant/hcsc-gov-ai/src/gov_ai/test.py", line 96, in connect_to_socketio self.sio.connect(url=SERVER_URL, headers=headers, transports=["polling"]) File "/home/vagrant/miniconda3/envs/chatbot/lib/python3.9/site-packages/socketio/client.py", line 159, in connect raise exceptions.ConnectionError(exc.args[0]) from None socketio.exceptions.ConnectionError: Unexpected response from server
tweaked the socket io connect arguments: tried both polling
and websockets
transports.
Any help is appreciated!
@saimanoj1206, how are you running your chainlit application? Are you able to access the chainlit app at your endpoint from browser?
hey @ShubhamMaddhashiya-bidgely , I hosted my app on azure web app and it's accessible from my browser. Note: it has login page too. I added few more methods to handle login
def on_start(self):
# Perform login and get auth token
self.auth_token = self.login()
if self.auth_token:
self.connect_to_socketio()
else:
print("Login failed. Skipping WebSocket connection.")
def login(self):
headers = {
"Content-Type": "application/x-www-form-urlencoded" # Explicitly setting content type
}
# Perform login to get auth token
login_payload = {
"grant_type": "password", # Set the grant type as required
"username": USERNAME,
"password": PASSWORD,
"scope": "", # If your API doesn't use scopes, you can leave this empty
"client_id": "", # Set if your API requires this field, otherwise leave empty
"client_secret": "", # Set if your API requires this field, otherwise leave empty
}
login_response = self.client.post(
LOGIN_URL, data=login_payload, headers=headers
)
if login_response.status_code == 200:
# Assuming auth token is returned in JSON format
auth_data = login_response.json()
token = auth_data.get("access_token") # Adjust based on actual response
print(f"Login successful, token: {token}")
return token
else:
print(f"Login failed with status code: {login_response.status_code}")
return None
def connect_to_socketio(self):
headers = {
"X-Session-Id": str(uuid.uuid4()), # Unique session ID per user
"X-Client-Type": "test-client",
"Authorization": f"Bearer {self.auth_token}", # Pass the auth token in the headers
}
self.connection_start_time = time.time() # Track connection start time
self.sio.connect(url=SERVER_URL, headers=headers, transports=["websocket"])
self.wait_for_handshake()
Is your feature request related to a problem? Please describe. Is there any functionality / approach to load test a deployed Chainlit app?
Something that can simulate sending messages in different user sessions with configurable load and message content? This is needed for any serious production environment, automated performance testing within the deployment process is non-negotiable. We need to be able to test changes with production-like load.
Describe the solution you'd like Functionality like Streamlit's AppTest class allowing you to drive the webapp: https://docs.streamlit.io/develop/api-reference/app-testing#the-apptest-class
Or if that's not possible expose and drive the internal API in some manner so user sessions can be started and messages be sent. If that is currently possible, it is not clearly documented https://docs.chainlit.io/deploy/api
Describe alternatives you've considered Webdriving the deployed app using a browser automation framework like Selenium or Playwright.
But this would be a lot of manual development and extra test deployment work which will have to track application changes.
Context It is important to have automated testing of a webapp as-close as possible to user input. Ideally, you want to account for any frontend overhead in testing performance, so backend only driving method could give wrong results.