I have been working with the LiveKit project, using the integrated OpenAI Assistant, and I have encountered an intermittent problem that occurs when interrupting the assistant while it is responding. Despite having implemented several suggested solutions to properly handle task cancellation and avoid concurrent executions, the error still persists occasionally.
Problem:
When I interrupt the assistant by speaking while it is generating a response, sometimes an error occurs that prevents the assistant from responding to new inputs until I repeat my message. The error that appears in the logs is as follows:
openai.BadRequestError: Error code: 400 - {'error': {'message': 'Thread thread_xxx already has an active run run_xxx.', 'type': 'invalid_request_error', 'param': None, 'code': None}}
This indicates that an attempt is being made to start a new run on a thread that already has an active run, which is not allowed by the OpenAI API.
What I Have Tried to Solve the Problem:
Handling Task Cancellation: I implemented the aclose method in the AssistantLLMStream class to properly cancel the _create_stream_task when the assistant is interrupted.
async def aclose(self):if not self._create_stream_task.done():self._create_stream_task.cancel()try:await self._create_stream_taskexcept asyncio.CancelledError:pass
Exception Handling in _main_task: I modified the _main_task method to handle asyncio.CancelledError and properly close the OpenAI stream.
async def _main_task(self) -> None:try:# Existing code...async with self._client.beta.threads.runs.stream(**kwargs) as stream:try:await stream.until_done()except asyncio.CancelledError:await stream.aclose()raise# Existing code...except asyncio.CancelledError:logger.debug("AssistantLLMStream _main_task was cancelled")passfinally:if not self._done_future.done():self._done_future.set_result(None)
Avoiding Concurrent Executions: I used an asyncio.Lock to ensure that new executions are not started while one is already active.
# In AssistantLLM.__init__()self._lock = asyncio.Lock()
# When creating an instance of AssistantLLMStreamreturn AssistantLLMStream(# ...lock=self._lock,)
# In AssistantLLMStream._main_task()async with self._lock:# Existing code...
Despite these modifications, the error still occurs intermittently. The assistant sometimes correctly handles the interruption and responds to the new input, but other times it does not, and the error appears in the logs.
Relevant Logs:
Here are excerpts from the logs showing the error:
2024-11-21 12:06:03,705 - DEBUG livekit.plugins.openai - AssistantLLMStream _main_task was cancelled
openai.BadRequestError: Error code: 400 - {'error': {'message': 'Thread thread_xxx already has an active run run_xxx.', 'type': 'invalid_request_error', 'param': None, 'code': None}}
Additional Information:
The error does not occur every time, suggesting that there may be a race condition or a case where the previous execution is not completely cancelled before starting a new one.
I have noticed that the problem occurs more frequently when the interruptions are very quick or consecutive.
The "channel closed" messages are now logged as DEBUG and do not seem to be directly related to this issue.
I Request Help To:
Identify the root cause of this intermittent error.
Get suggestions on how to ensure that previous executions are completely cancelled before starting new ones.
Understand if there is any additional configuration or state management that should be implemented to avoid this conflict with the OpenAI API.
Any guidance or code examples that you can provide would be greatly appreciated. Thank you in advance for your time and assistance!
I appreciate any help you can provide to resolve this issue.
Hello,
I have been working with the LiveKit project, using the integrated OpenAI Assistant, and I have encountered an intermittent problem that occurs when interrupting the assistant while it is responding. Despite having implemented several suggested solutions to properly handle task cancellation and avoid concurrent executions, the error still persists occasionally.
Problem:
When I interrupt the assistant by speaking while it is generating a response, sometimes an error occurs that prevents the assistant from responding to new inputs until I repeat my message. The error that appears in the logs is as follows:
openai.BadRequestError: Error code: 400 - {'error': {'message': 'Thread thread_xxx already has an active run run_xxx.', 'type': 'invalid_request_error', 'param': None, 'code': None}}
This indicates that an attempt is being made to start a new run on a thread that already has an active run, which is not allowed by the OpenAI API.
What I Have Tried to Solve the Problem:
async def aclose(self):
if not self._create_stream_task.done():
self._create_stream_task.cancel()
try:
await self._create_stream_task
except asyncio.CancelledError:
pass
async def _main_task(self) -> None:
try:
# Existing code...
async with self._client.beta.threads.runs.stream(**kwargs) as stream:
try:
await stream.until_done()
except asyncio.CancelledError:
await stream.aclose()
raise
# Existing code...
except asyncio.CancelledError:
logger.debug("AssistantLLMStream _main_task was cancelled")
pass
finally:
if not self._done_future.done():
self._done_future.set_result(None)
# In AssistantLLM.__init__()
self._lock = asyncio.Lock()
# When creating an instance of AssistantLLMStream
return AssistantLLMStream(
# ...
lock=self._lock,
)
# In AssistantLLMStream._main_task()
async with self._lock:
# Existing code...
Despite these modifications, the error still occurs intermittently. The assistant sometimes correctly handles the interruption and responds to the new input, but other times it does not, and the error appears in the logs.
Relevant Logs:
Here are excerpts from the logs showing the error:
2024-11-21 12:06:03,705 - DEBUG livekit.plugins.openai - AssistantLLMStream _main_task was cancelled
openai.BadRequestError: Error code: 400 - {'error': {'message': 'Thread thread_xxx already has an active run run_xxx.', 'type': 'invalid_request_error', 'param': None, 'code': None}}
Additional Information:
The error does not occur every time, suggesting that there may be a race condition or a case where the previous execution is not completely cancelled before starting a new one.
I have noticed that the problem occurs more frequently when the interruptions are very quick or consecutive.
The "channel closed" messages are now logged as DEBUG and do not seem to be directly related to this issue.
I Request Help To:
Identify the root cause of this intermittent error.
Get suggestions on how to ensure that previous executions are completely cancelled before starting new ones.
Understand if there is any additional configuration or state management that should be implemented to avoid this conflict with the OpenAI API.
Any guidance or code examples that you can provide would be greatly appreciated. Thank you in advance for your time and assistance!
I appreciate any help you can provide to resolve this issue.