josephlim94 / python_janus_client

Python Janus WebRTC client
MIT License
30 stars 7 forks source link

How to implement custom JanusTextRoomPlugin? #3

Open deaffella opened 5 months ago

deaffella commented 5 months ago

Hi! I successfully launched JanusVideoCallPlugin and it works great. However, I'm confused about how to write my own JanusTextRoomPlugin. Could you help me with an example? I would like to receive and send messages to the text room.

josephlim94 commented 5 months ago

Hi! Great to hear that.

import asyncio
import logging

from janus_client import (
    JanusSession,
    JanusPlugin,
)
from janus_client.message_transaction import is_subset

logger = logging.getLogger(__name__)

class JanusTextRoomPlugin(JanusPlugin):
    """Janus TextRoom plugin implementation"""

    name = "janus.plugin.textroom"

    async def on_receive(self, response: dict):
        print(f"Received message: {response}")

    async def list(
        self,
    ) -> dict:
        """List available rooms."""

        success_matcher = {
            "janus": "success",
            "plugindata": {
                "plugin": self.name,
                "data": {
                    "textroom": "event",
                },
            },
        }

        body = {
            "textroom": "list",
        }

        message_transaction = await self.send(
            message={
                "janus": "message",
                "body": body,
            },
        )
        response = await message_transaction.get(matcher=success_matcher, timeout=15)
        await message_transaction.done()

        if is_subset(response, {"janus": "error", "error": {}}):
            raise Exception(f"Janus error: {response}")

        return response

async def main():
    session = JanusSession(
        base_url="yoururlhere.com/janus",
    )

    plugin_textroom = JanusTextRoomPlugin()

    await plugin_textroom.attach(session=session),

    response = await plugin_textroom.list()

    print(response)
    print("--- Everything done ---")

    await plugin_textroom.destroy()

    await session.destroy()

asyncio.run(main())

You could start with this and refer to the documentation to implement other APIs for the plugin. Here: https://janus.conf.meetecho.com/docs/textroom.html

Let me know if you have more questions.

deaffella commented 5 months ago

I managed to create a session and get a list of rooms and even a list of participants in the room. However, when I try to join or send a message to a room, I get an "invalid request join" error. I tried to send different types of requests according to Janus documentation, but I can't join the room.

deaffella commented 5 months ago

Ok, I managed to join the room and send a message! The janus gateway documentation does not say that yo ushould use 'request': 'list' to join the room or send messages to it.

{
    "janus": "message",
    "body": {
        "textroom": "message",
        "username": **username**,
        "room": **room**,
        "text": **text**,
        'request': 'list'
    }
}

Now I'm faced with the fact that I can't receive messages. I tried to reimplement the method as in your example above, but this method is simply not called when sending messages to the text room. Nothing happens at python side when i send messages to the same text room in browser.

async def on_receive(self, response: dict):
    print(f"Received message: {response}")
josephlim94 commented 5 months ago

A common mistake here is that you blocked the main thread when waiting to receive the message. Did you do something like using time.sleep(100) instead of await asyncio.sleep(100)? Please verify.

deaffella commented 5 months ago

Let me show you what I'm doing. Here is the plugin_text_room.py:

  import asyncio
from janus_client import JanusPlugin
from typing import Dict, Any

class JanusTextRoomPlugin(JanusPlugin):
    """Janus EchoTest plugin implementation"""

    name = "janus.plugin.textroom"
    __webrtcup_event: asyncio.Event

    def __init__(self) -> None:
        super().__init__()

        self.__webrtcup_event = asyncio.Event()

    async def on_receive(self, response: dict):
        print(f'[textroom_plugin] response: {response}')

    async def send_request(self, request: [Dict[str, Any]]):
        full_message = {"janus": "message"}
        full_message["body"] = request
        message_transaction = await self.send(message=full_message)
        return message_transaction

And run.py:

  import asyncio
  import json
  import random
  import time

  from janus_client import JanusSession, JanusEchoTestPlugin, JanusVideoRoomPlugin
  from plugins.plugin_text_room import JanusTextRoomPlugin

  class JanusTextRoomClient:
      def __init__(self,
                   url: str,
                   username: str = f'usernameTest-{random.randint(0, 100)}',
                   ):
          self._url = url
          self._username = username
          self.session: JanusSession = None
          self.plugin_handle: JanusTextRoomPlugin = None

      @property
      def url(self):
          return self._url

      @property
      def username(self):
          return self._username

      async def connect(self):
          self.session = JanusSession(base_url=self._url)
          self.plugin_handle = JanusTextRoomPlugin()
          await self.plugin_handle.attach(session=self.session)

      async def disconnect(self):
          await self.plugin_handle.destroy()
          await self.session.destroy()

      async def on_receive(self, response: dict):
          print(f"New message: {response}")

      def run(self):
          loop = asyncio.get_event_loop()
          loop.run_until_complete(
              self._main_loop()
          )

      async def get_room_list(self):
          request = {'request': 'list'}
          return await self.plugin_handle.send_request(request=request)

      async def get_participants_list(self, room: int):
          request = {'request': 'listparticipants', "room": room}
          return await self.plugin_handle.send_request(request=request)

      async def join_room(self, room: int):
          join_room_msg = {
              "janus": "message",
              "body": {
                  "textroom": "join",
                  "username": self.username,
                  "room": room,
                  'request': 'list'
              }
          }
          return await self.plugin_handle.send(message=join_room_msg)

      async def send_message(self, room: int, text: str):
          join_room_msg = {
              "janus": "message",
              "body": {
                  "textroom": "message",
                  "username": self.username,
                  "room": room,
                  "text": text,
                  'request': 'list'
              }
          }
          return await self.plugin_handle.send(message=join_room_msg)

      async def _main_loop(self):
          await self.connect()

          # join
          print(f'Plugin handle ID:\t{self.plugin_handle.id}')
          print(f'room_list:\n{json.dumps(await self.get_room_list(), indent=4)}')
          print(f'participants_list:\n{json.dumps(await self.get_participants_list(room=1234), indent=4)}')
          print(f'join_room:\n{json.dumps(await self.join_room(room=1234), indent=4)}')

          # send test message
          print(f'result:\n{json.dumps(await self.send_message(room=1234, text=str(time.time())), indent=4)}')

          # wait for new?
          await asyncio.sleep(100)

          await self.disconnect()

  if __name__ == "__main__":
      base_url = "ws://***:****/janus"

      client = JanusTextRoomClient(url=base_url)
      print(client.url)
      client.run()
josephlim94 commented 5 months ago

Looks good to me. I'll try when I have time.

josephlim94 commented 5 months ago

Hi, I tried. It seems that we need to connect a DataChannel to receive messages. janus-client uses aiortc to create RTCPeerConnection, you can refer to their documentation (https://github.com/aiortc/aiortc?tab=readme-ov-file) and take a look at the code for JanusEchoTestPlugin (https://github.com/josephlim94/python_janus_client/blob/master/janus_client/plugin_echotest.py) to find out how to make a WebRTC connection.

import asyncio
import logging

from janus_client import (
    JanusSession,
    JanusPlugin,
)
from janus_client.message_transaction import is_subset

logger = logging.getLogger(__name__)

class JanusTextRoomPlugin(JanusPlugin):
    """Janus TextRoom plugin implementation"""

    name = "janus.plugin.textroom"

    async def on_receive(self, response: dict):
        print(f"Received message: {response}")

    async def send_wrapper(self, message: dict, matcher: dict) -> dict:
        def function_matcher(response: dict):
            return (
                is_subset(
                    response,
                    {
                        "janus": "success",
                        "plugindata": {
                            "plugin": self.name,
                            "data": matcher,
                        },
                    },
                )
                or is_subset(
                    response,
                    {
                        "janus": "success",
                        "plugindata": {
                            "plugin": self.name,
                            "data": {
                                "textroom": "event",
                            },
                        },
                    },
                )
                or is_subset(response, {"janus": "error", "error": {}})
            )

        message_transaction = await self.send(
            message={
                "janus": "message",
                "body": message,
            },
        )
        message_response = await message_transaction.get(
            matcher=function_matcher, timeout=15
        )
        await message_transaction.done()

        if is_subset(message_response, {"janus": "error", "error": {}}):
            raise Exception(f"Janus error: {message_response}")

    async def list(
        self,
    ) -> dict:
        """List available rooms."""

        return await self.send_wrapper(
            message={
                "request": "list",
            },
            matcher={
                "textroom": "success",
                "list": [],
            },
        )

    async def get_participants_list(self, room: int):
        """List participants in a specific room"""

        return await self.send_wrapper(
            message={
                "request": "listparticipants",
                "room": room,
            },
            matcher={
                "room": room,
                "participants": [],
            },
        )

    async def join_room(self, room: int):
        return await self.send_wrapper(
            message={
                "request": "list",
                "textroom": "join",
                "username": "test_username",
                "room": room,
            },
            matcher={
                "textroom": "success",
                "participants": [],
            },
        )

    async def message(self, room: int, text: str, ack: bool = True):
        return await self.send_wrapper(
            message={
                "request": "list",
                "textroom": "message",
                "room": room,
                "text": text,
                "ack": ack,
            },
            matcher={
                "textroom": "success",
            },
        )

    async def leave(self, room: int):
        return await self.send_wrapper(
            message={
                "request": "list",
                "textroom": "leave",
                "room": room,
            },
            matcher={
                "textroom": "success",
            },
        )

    async def announcement(self, room: int, text: str) -> dict:
        return await self.send_wrapper(
            message={
                "request": "list",
                "textroom": "announcement",
                "room": room,
                "secret": "adminpwd",
                "text": text,
            },
            matcher={
                "textroom": "success",
            },
        )

async def main():
    session = JanusSession(
        base_url="wss://janusmy.josephgetmyip.com/janusbasews/janus",
        api_secret="janusrocks",
    )

    plugin_textroom = JanusTextRoomPlugin()

    await plugin_textroom.attach(session=session),

    response = await plugin_textroom.list()

    response = await plugin_textroom.get_participants_list(1234)

    response = await plugin_textroom.join_room(1234)

    response = await plugin_textroom.message(1234, "test msg")

    await asyncio.sleep(20)
    print("--- Wait for awhile ---")

    response = await plugin_textroom.leave(1234)

    response = await plugin_textroom.announcement(1234, "test announcement")

    print(response)
    print("--- Everything done ---")

    await plugin_textroom.destroy()

    await session.destroy()

asyncio.run(main())

This is the code I have now. I very much welcome contributions 😃 .

iamletenkov commented 5 months ago

I finally managed to connect to the textroom. I can send and receive messages. Unfortunately, I was unable to integrate this approach into your code. Could you please help me write the correct plugin for python_janus_client?

here is my code:

import asyncio
import datetime
import logging
import string
import sys
import random
import time
import json
import websockets as ws
from queue import Queue
from threading import Thread
from aiortc.rtcdatachannel import RTCDataChannel
from aiortc import RTCPeerConnection, RTCSessionDescription

logger = logging.getLogger('echo')

class WebSocketClient():
    def __init__(self, url='ws://localhost:8188/'):
        self._url = url
        self.connection = None
        self._transactions = {}

    async def connect(self):
        self.connection = await ws.connect(self._url,
                                           subprotocols=['janus-protocol'],
                                           ping_interval=10,
                                           ping_timeout=10,
                                           compression=None)
        if self.connection.open:
            asyncio.ensure_future(self.receiveMessage())
            logger.info('WebSocket connected')
            return self

    def transaction_id(self):
        return ''.join(random.choice(string.ascii_letters) for x in range(12))

    async def send(self, message):
        tx_id = self.transaction_id()
        message.update({'transaction': tx_id})
        tx = asyncio.get_event_loop().create_future()
        tx_in = {'tx': tx, 'request': message['janus']}
        self._transactions[tx_id] = tx_in
        try:
            await asyncio.gather(self.connection.send(json.dumps(message)), tx)
        except Exception as e:
            tx.set_result(e)
        return tx.result()

    async def receiveMessage(self):
        try:
            async for message in self.connection:
                data = json.loads(message)
                tx_id = data.get('transaction')
                response = data['janus']

                # Handle ACK
                if tx_id and response == 'ack':
                    logger.debug(f'Received ACK for transaction {tx_id}')
                    if tx_id in self._transactions:
                        tx_in = self._transactions[tx_id]
                        if tx_in['request'] == 'keepalive':
                            tx = tx_in['tx']
                            tx.set_result(data)
                            del self._transactions[tx_id]
                            logger.debug(f'Closed transaction {tx_id}'
                                         f' with {response}')
                    continue

                # Handle Success / Event / Error
                if response not in {'success', 'error'}:
                    logger.info(f'Janus Event --> {response}')

                if tx_id and tx_id in self._transactions:
                    tx_in = self._transactions[tx_id]
                    tx = tx_in['tx']
                    tx.set_result(data)
                    del self._transactions[tx_id]
                    logger.debug(f'Closed transaction {tx_id}'
                                 f' with {response}')
        except Exception:
            logger.error('WebSocket failure')
        logger.info('Connection closed')

    async def close(self):
        if self.connection:
            await self.connection.close()
            self.connection = None
        self._transactions = {}

class JanusPlugin:
    def __init__(self, session, handle_id):
        self._session = session
        self._handle_id = handle_id

    async def sendMessage(self, message):
        logger.info('Sending message to the plugin')
        message.update({'janus': 'message', 'handle_id': self._handle_id})
        response = await self._session.send(message)
        return response

class JanusSession:
    def __init__(self, url='ws://localhost:8188/'):
        self._websocket = None
        self._url = url
        self._handles = {}
        self._session_id = None
        self._ka_interval = 15
        self._ka_task = None

    async def send(self, message):
        message.update({'session_id': self._session_id})
        response = await self._websocket.send(message)
        return response

    async def create(self):
        logger.info('Creating session')
        self._websocket = await WebSocketClient(self._url).connect()
        message = {'janus': 'create'}
        response = await self.send(message)
        assert response['janus'] == 'success'
        session_id = response['data']['id']
        self._session_id = session_id
        self._ka_task = asyncio.ensure_future(self._keepalive())
        logger.info('Session created')

    async def attach(self, plugin):
        logger.info('Attaching handle')
        message = {'janus': 'attach', 'plugin': plugin}
        response = await self.send(message)
        assert response['janus'] == 'success'
        handle_id = response['data']['id']
        handle = JanusPlugin(self, handle_id)
        self._handles[handle_id] = handle
        logger.info('Handle attached')
        return handle

    async def destroy(self):
        logger.info('Destroying session')
        if self._session_id:
            message = {'janus': 'destroy'}
            await self.send(message)
            self._session_id = None
        if self._ka_task:
            self._ka_task.cancel()
            try:
                await self._ka_task
            except asyncio.CancelledError:
                pass
            self._ka_task = None
        self._handles = {}
        logger.info('Session destroyed')

        logger.info('Closing WebSocket')
        if self._websocket:
            await self._websocket.close()
            self._websocket = None

    async def _keepalive(self):
        while True:
            logger.info('Sending keepalive')
            message = {'janus': 'keepalive'}
            await self.send(message)
            logger.info('Keepalive OK')
            await asyncio.sleep(self._ka_interval)

class JanusTextRoomPlugin:
    name = "janus.plugin.textroom"
    plugin: JanusPlugin
    data_channel: RTCDataChannel

    def __init__(self,
                 url: str = 'ws://localhost:8188/',
                 room: int = 1234,
                 username: str = f"user-{random.randint(0, 100)}",
                 ):
        self.url = url
        self.room = room
        self.username = username
        self.session = JanusSession(self.url)
        self.pc = RTCPeerConnection()

        self.send_queue = Queue()
        self.send_thread = Thread(target=self.run_send_loop, daemon=True, name='send_thread')

    async def _create_data_channel(self):
        # create session
        await self.session.create()

        # attach to echotest plugin
        self.plugin = await self.session.attach(self.name)

        # create data-channel
        self.data_channel = self.pc.createDataChannel('JanusDataChannel')
        print(f'DataChannel ({self.data_channel.label}) created')

        @self.data_channel.on('open')
        def on_open():
            print(f'DataChannel ({self.data_channel.label}) open')

        @self.data_channel.on('close')
        def on_close():
            print(f'DataChannel ({self.data_channel.label}) closed')

        @self.data_channel.on('message')
        def on_message(message):
            print(f'DataChannel ({self.data_channel.label}) received: {message}')

        setup_response = await self.plugin.sendMessage({'body': {'request': 'setup'}})
        print()
        print('setup_response')
        print(setup_response)
        print()
        offer = RTCSessionDescription(sdp=setup_response['jsep']['sdp'], type=setup_response['jsep']['type'])
        await self.pc.setRemoteDescription(offer)
        await self.pc.setLocalDescription(await self.pc.createAnswer())

    async def join(self):
        response = await self.plugin.sendMessage({'body': {
            'request': 'list',
            'textroom': 'join',
            'room': self.room,
            'display': self.username,
            'username': self.username,
        },
            'jsep': {
                'sdp': self.pc.localDescription.sdp,
                'trickle': True,
                'type': self.pc.localDescription.type,
            },
        })
        await asyncio.sleep(4)

    async def _send_loop(self):
        await self._create_data_channel()
        await self.join()
        while True:
            if not self.send_queue.empty():
                message = self.send_queue.get()
                await self.plugin.sendMessage({'body': {
                    'request': 'list',
                    'textroom': 'message',
                    'room': self.room,
                    "text": str(message),
                    "ack": True,
                }})

    def run_send_loop(self):
        loop = asyncio.new_event_loop()
        try:
            loop.run_until_complete(
                self._send_loop()
            )
            sys.exit(0)
        except Exception:
            sys.exit(1)

    def run_send_loop(self):
        loop = asyncio.new_event_loop()
        try:
            loop.run_until_complete(
                self._send_loop()
            )
            sys.exit(0)
        except Exception:
            sys.exit(1)

    def start(self):
        self.send_thread.start()

    def add_message_to_queue(self, message: str):
        self.send_queue.put(message)

if __name__ == '__main__':
    url = 'ws://*.*.*.*:8188/janus'
    plugin = JanusTextRoomPlugin(url=url, room=1234)
    plugin.start()

    while True:
        time.sleep(1)
        plugin.add_message_to_queue(
            f'test message{datetime.datetime.now()}'
        )

I also noticed some strange behavior when reading and sending messages. I can receive messages in the browser using the standard janus gateway interface only if inside janus.jcfg has set the value of the variable full_trickle = true. However, if full_trickle = true, then my python client is not able to read messages, only send them. I noticed that only if full_trickle = false, then I can exchange messages using python clients.

josephlim94 commented 5 months ago

Hi @iamletenkov , thank you for your effort. It seems that the DataChannel is not connected.

In your script you create the RTCDataChannel and then register event handlers to it. Since we must send the setup request and wait for an offer, instead of us generating the offer, I think it's more correct to let RTCPeerConnection create the DataChannel on it's own and we only register the event handler for it.

When full_trickle = true, the peer connection can't be established because python_janus_client doesn't support trickle. However in my case whether full_trickle is true or false, in both cases the DataChannel can't be connected and I still can't receive message from browser. If set full_trickle = false then it should be able to work between Python clients, but full_trickle = true will need more work.

In my script I tried to implement support for trickle, it's still done in the plugin handler, but I didn't manage to get it to work. Not tested between Python clients yet though.

I've created the branch implement_plugin_text_room and added the work we have so far to https://github.com/josephlim94/python_janus_client/blob/implement_plugin_text_room/plugin_textroom.py I hope that we can work together on this branch. It helps to give credit for your contribution.

josephlim94 commented 4 months ago

Hi @iamletenkov , I finally managed to setup the data channel.

  1. I was wrong to assume the JSEP answer is to be sent with "join" request. But I still need to send a request to reply the JSEP answer, so I've chosen to send it with an "ack" request instead.
  2. aiortc is using aioice which supports "half trickle" I think (https://github.com/aiortc/aioice/issues/4). To work with Janus full_trickle=true, we need to wait for Janus to finish sending ICE candidates from trickle messages. For now I just made the code wait for awhile and assume Janus already finish sending, then only create JSEP answer and set local description (await self._pc.setLocalDescription(await self._pc.createAnswer())).
  3. The JSEP answer created by aiortc peer connection always contains all ICE candidates, and Janus can work with that so no worries there.

I've pushed my changes. Now I can't accept the hardcoded wait for the trickle ice candidates. I'm going to take my time to find a solution for this.