Open willsmanley opened 3 months ago
@davidzhao fyi, thank you in advance
i'm making some limited progress here:
1) i realized that mp3 recordings ought to be managed by livekit/egress
, duh. specifically as an autoegress upon room creation request. strangely, i could only get this to work with a raw twirp request rather than with an SDK, but that's ok. So far, I am autoegressing the user's track, but it's not publishing the agent's track. I will update here if I figure out how to do that.
2) i think LLM token usage is solved at a basic level from the PR linked above, but it could possibly be more of a first-class citizen of the API than this changeset suggests. That was just the minimal change required.
3) seems hacky to log completion requests via will_synthesize_assistant_reply
and still haven't made a good solution for logging completion responses.
in general, have library authors looked at pipecat? they made some clearly different choices, but just wanted to bring this up in case there is anything to be learned from their implementation. i made a mutable.ai wiki for both repos since both are early in their development and documentation: https://wiki.mutable.ai/pipecat-ai/pipecat https://wiki.mutable.ai/livekit/agents
update: i figured out how to also egress the room composite in addition to the user's track, but still having issues with the agent-only track. created a separate issue here: https://github.com/livekit/agents/issues/656
created this PR for logging completion requests: https://github.com/livekit/agents/pull/658
Yeah egress is a good way to record audio. For LLM completions you can use the VoiceAssistant user_speech_committed
and agent_speech_committed
events
I think in general this is a great idea. we'd want to capture metrics on usage and report them.
The problem with user_speech_committed
and agent_speech_committed
is that they only emit the most recent message (and not any RAG output from will_synthesize_agent_response
or tool_calls
either). They could be extended to emit other context as well. I made this PR which emits everything I would be interested in (and I assume others who are doing LLMops / monitoring): https://github.com/livekit/agents/pull/658
And usage tokens PR here: https://github.com/livekit/agents/pull/614
I'm happy to change either one if you want to let me know what you'd like to see differently.
On my fork with these changes (along with the egress part), I have pretty much e2e monitoring for how fast tokens are being spent, who is spending them, and what all of the llm requests/responses are. It even works if you need to log ChatImages, but requires you to serialize these into JSON.
thanks for your feedback (and patience) Wills! we'll create a proposal around this that satisfies what you are looking for. ideally timing information is included there as well.
How did you solve this Wills ?
I made some PRs which are linked above. They probably need to be rebased since there have been some API changes, but you can get the gist from that. And I updated the egress solution on the other thread since it isn't something that can be solved with a PR in this library.
thanks for your feedback (and patience) Wills! we'll create a proposal around this that satisfies what you are looking for. ideally timing information is included there as well.
Hi David!
Have started playing around with the framework, have a realtime agent hooked up to a phone number I've been using for the last couple of weeks.
Just wanted to make sure that the Realtime agent as well as Pipeline will be covered (for the audio logging/processing/generally extracting the data). Have really been struggling with logging and tracing with that so far.
Do you have any ideas on how to go about doing that?
Right now, I'm trying to do so like so:
class AgentContext:
"""
Context for event handlers, including user, conversation, and synchronization.
"""
def __init__(self, user: User, conversation: Conversation):
self.user = user
self.conversation = conversation
self.last_message_index = 0
self.lock = asyncio.Lock()
async def run_multimodal_agent(
ctx: JobContext, participant: rtc.Participant, db: AsyncSession
):
"""
Sets up and runs the MultimodalAgent, registering event handlers for logging.
"""
logger.info('Starting MultimodalAgent')
room = ctx.room
# Setup Realtime Model with OpenAI
openai_realtime_model = realtime.RealtimeModel(
instructions=(
"\n\nCurrent time: {DateTime.now().format('h:mm A')}, {DateTime.now().format('MMMM D')}."
),
temperature=0.8,
)
fnc_ctx = AssistantFnc()
transcription_options = AgentTranscriptionOptions()
agent = MultimodalAgent(
model=openai_realtime_model,
fnc_ctx=fnc_ctx,
transcription=transcription_options,
)
user = await get_or_create_user(
db, username='darin', email='darinkishore@protonmail.com'
)
conversation = await create_conversation(db, user.user_id)
agent_context = AgentContext(user, conversation)
# Start agent
agent.start(ctx.room, participant)
logger.info('MultimodalAgent started and running.')
# Access session via room's session
session = openai_realtime_model.sessions[0]
# Register event handlers
session.on(
'input_speech_transcription_completed',
lambda: asyncio.create_task(handle_new_messages(session, agent_context)),
)
agent.on(
'agent_stopped_speaking',
lambda: asyncio.create_task(handle_new_messages(session, agent_context)),
)
async def handle_new_messages(session, context: AgentContext):
"""
Processes and logs new messages from ChatContext.
"""
async with context.lock:
async with get_db() as db:
messages = session.chat_ctx.messages
new_messages = messages[context.last_message_index :]
for msg in new_messages:
await process_message(msg, context, db)
context.last_message_index = len(messages)
async def process_message(
msg: llm.ChatMessage, context: AgentContext, db: AsyncSession
):
"""
Logs individual messages and function calls to the database.
"""
timestamp = DateTime.utcnow()
sender = msg.role
message_type = 'text'
# Extract text
if isinstance(msg.content, str):
message_text = msg.content
elif isinstance(msg.content, list):
message_text = ''.join(part for part in msg.content if isinstance(part, str))
else:
message_text = ''
if msg.tool_calls:
# ...
But I'm unsure what the issue is right now. Is the getting things from the ChatCtx the way to go? What events should I listen for? I also really want to process the audio for realtime myself as well, because the current OpenAI transcripts are inaccurate a lot, but we can't configure the TTS provider, and I have no idea how to get the transcript of the convo at all really (async code).
RealtimeAPI is not yet emitting metrics. It's on our list and will be released shortly.
More of a question than an issue, but I wanted to see if you have a recommendation for creating audit trails or plans to support it more directly from a library interface.
I would like to have tools to record:
I am going to work on stitching all of this together and see how far I get, but I wanted to start a discussion here.
So far, unless you are using your own LLM plugin or forking an existing LLM plugin, it seems like you need to hook into the completion request within
will_synthesize_assistant_reply
. Capturing the response without forking seems trickier. And mp3, not sure about that yet.