livekit / python-sdks

LiveKit real-time and server SDKs for Python
https://docs.livekit.io
Apache License 2.0
162 stars 45 forks source link
agents ai python real-time webrtc
The LiveKit icon, the name of the repository and some sample code in the background.

pypi-v

๐Ÿ“น๐ŸŽ™๏ธ๐Ÿ Python SDK for LiveKit

Use this SDK to add realtime video, audio and data features to your Python app. By connecting to LiveKit Cloud or a self-hosted server, you can quickly build applications such as multi-modal AI, live streaming, or video calls with just a few lines of code.

This repo contains two packages

Using Server API

$ pip install livekit-api

Generating an access token

from livekit import api
import os

# will automatically use the LIVEKIT_API_KEY and LIVEKIT_API_SECRET env vars
token = api.AccessToken() \
    .with_identity("python-bot") \
    .with_name("Python Bot") \
    .with_grants(api.VideoGrants(
        room_join=True,
        room="my-room",
    )).to_jwt()

Creating a room

RoomService uses asyncio and aiohttp to make API calls. It needs to be used with an event loop.

from livekit import api
import asyncio

async def main():
    lkapi = api.LiveKitAPI(
        'http://localhost:7880',
    )
    room_info = await lkapi.room.create_room(
        api.CreateRoomRequest(name="my-room"),
    )
    print(room_info)
    results = await lkapi.room.list_rooms(api.ListRoomsRequest())
    print(results)
    await lkapi.aclose()

asyncio.run(main())

Using Real-time SDK

$ pip install livekit

Connecting to a room

from livekit import rtc

async def main():
    room = rtc.Room()

    @room.on("participant_connected")
    def on_participant_connected(participant: rtc.RemoteParticipant):
        logging.info(
            "participant connected: %s %s", participant.sid, participant.identity)

    async def receive_frames(stream: rtc.VideoStream):
        async for frame in video_stream:
            # received a video frame from the track, process it here
            pass

    # track_subscribed is emitted whenever the local participant is subscribed to a new track
    @room.on("track_subscribed")
    def on_track_subscribed(track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant):
        logging.info("track subscribed: %s", publication.sid)
        if track.kind == rtc.TrackKind.KIND_VIDEO:
            video_stream = rtc.VideoStream(track)
            asyncio.ensure_future(receive_frames(video_stream))

    # By default, autosubscribe is enabled. The participant will be subscribed to
    # all published tracks in the room
    await room.connect(URL, TOKEN)
    logging.info("connected to room %s", room.name)

    # participants and tracks that are already available in the room
    # participant_connected and track_published events will *not* be emitted for them
    for participant in room.participants.items():
        for publication in participant.track_publications.items():
            print("track publication: %s", publication.sid)

Sending and receiving chat


room = rtc.Room()
...

chat = rtc.ChatManager(room)

# receiving chat
@chat.on("message_received")
def on_message_received(msg: rtc.ChatMessage):
    print(f"message received: {msg.participant.identity}: {msg.message}")

# sending chat
await chat.send_message("hello world")

RPC

Perform your own predefined method calls from one participant to another.

This feature is especially powerful when used with Agents, for instance to forward LLM function calls to your client application.

Registering an RPC method

The participant who implements the method and will receive its calls must first register support:

@room.local_participant.register_rpc_method("greet")
async def handle_greet(data: RpcInvocationData):
    print(f"Received greeting from {data.caller_identity}: {data.payload}")
    return f"Hello, {data.caller_identity}!"

In addition to the payload, your handler will also receive response_timeout, which informs you the maximum time available to return a response. If you are unable to respond in time, the call will result in an error on the caller's side.

Performing an RPC request

The caller may then initiate an RPC call like so:

try:
  response = await room.local_participant.perform_rpc(
    destination_identity='recipient-identity',
    method='greet',
    payload='Hello from RPC!'
  )
  print(f"RPC response: {response}")
except Exception as e:
  print(f"RPC call failed: {e}")

You may find it useful to adjust the response_timeout parameter, which indicates the amount of time you will wait for a response. We recommend keeping this value as low as possible while still satisfying the constraints of your application.

Errors

LiveKit is a dynamic realtime environment and calls can fail for various reasons.

You may throw errors of the type RpcError with a string message in an RPC method handler and they will be received on the caller's side with the message intact. Other errors will not be transmitted and will instead arrive to the caller as 1500 ("Application Error"). Other built-in errors are detailed in RpcError.

Examples

Getting help / Contributing

Please join us on Slack to get help from our devs / community members. We welcome your contributions(PRs) and details can be discussed there.


LiveKit Ecosystem
Realtime SDKsReact Components ยท Browser ยท Swift Components ยท iOS/macOS/visionOS ยท Android ยท Flutter ยท React Native ยท Rust ยท Node.js ยท Python ยท Unity (web) ยท Unity (beta)
Server APIsNode.js ยท Golang ยท Ruby ยท Java/Kotlin ยท Python ยท Rust ยท PHP (community)
Agents FrameworksPython ยท Playground
ServicesLiveKit server ยท Egress ยท Ingress ยท SIP
ResourcesDocs ยท Example apps ยท Cloud ยท Self-hosting ยท CLI