daily-co / daily-python

Daily Client SDK for Python
BSD 2-Clause "Simplified" License
35 stars 7 forks source link

How to correctly use completion callback? #3

Closed kylemcdonald closed 9 months ago

kylemcdonald commented 9 months ago

I have tried two approaches to using the completion callback for join and leave, but neither seem to work. I am looking for guidance. Iʻm asking here because it is unclear to me whether this is due to something about how the SDK works, but I acknowledge I also might be missing something basic about how Python works.

import argparse
import threading
from daily import Daily, CallClient
from time import sleep
import asyncio

async def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("-m", "--meeting", required=True, help="Meeting URL")
    parser.add_argument("-s", "--style", required=True, help="'thread', 'async', or 'sleep'")
    args = parser.parse_args()

    Daily.init()
    client = CallClient()

    print('join')
    if args.style == "thread":
        event = threading.Event()
        client.join(args.meeting, completion=lambda d,e: event.set())
        print('wait')
        event.wait()
        event = threading.Event()
        print('leave')
        client.leave(completion=lambda d,e: event.set())
        event.wait()
    elif args.style == "async":
        result = asyncio.Future()
        client.join(args.meeting, completion=lambda d,e: result.set_result(None))
        print('wait')
        await result
        result = asyncio.Future()
        print('leave')
        client.leave(completion=lambda d,e: result.set_result(None))
        await result
    elif args.style == "sleep":
        client.join(args.meeting)
        print('wait')
        sleep(2)
        print('leave')
        client.leave()
        sleep(2)
    print('exit')

if __name__ == "__main__":
    asyncio.run(main())

This code can be run like this:

python completion.py -m https://kcimc.daily.co/<channel name> -s thread

When I run with "sleep", it seems to work. It seems like joining and leaving consistently take less than 2 seconds, so there are no problems.

When I run with "async", the code never gets past the first await result. Even though the callback is called and the result is set, there seems to be some kind of deadlock maybe?

When I run with "thread" it almost works. It seems like the join callback is called before it is really finished, and I get an error like this:

{"timestamp":"2023-10-06T23:49:14.845694Z","level":"ERROR","fields":{"message":"Failed to subscribe to cam-audio track: RecvTrackRequestError(ResponseCanceled)","PeerId":"e50a918d-e49e-4bc0-aa72-d29b2a7f118b"},"target":"daily_core::state::subscription"}

And then the "leave" seems to trigger the callback before the "leave" is actually finished, and I get a segmentation fault or bus error. Here is the whole printout:

join
wait
leave
{"timestamp":"2023-10-06T23:49:14.845694Z","level":"ERROR","fields":{"message":"Failed to subscribe to cam-audio track: RecvTrackRequestError(ResponseCanceled)","PeerId":"e50a918d-e49e-4bc0-aa72-d29b2a7f118b"},"target":"daily_core::state::subscription"}
exit
[1]    98541 segmentation fault  python completion.py -m https://kcimc.daily.co/<channel name> -s thread
join
wait
leave
{"timestamp":"2023-10-06T23:49:28.567438Z","level":"ERROR","fields":{"message":"Failed to subscribe to cam-audio track: RecvTrackRequestError(ResponseCanceled)","PeerId":"e50a918d-e49e-4bc0-aa72-d29b2a7f118b"},"target":"daily_core::state::subscription"}
exit
[1]    98619 bus error  python completion.py -m https://kcimc.daily.co/<channel name> -s thread

Please advise on the correct solution. Thank you.

aconchillo commented 9 months ago

Hi @kylemcdonald . I have tried the three versions and I had no issues with sleep and thread. This was on macOS / Python 3.11.5. The asyncio version definitely hangs, we'll take a look at this.

For the thread version, it might be that something is still being setup (even though I couldn't reproduce it). In a normal use case there would be a bit of gap between join completion and leave(). But we'll take a look, since if everything was really complete this shouldn't happen.

What I would suggest, since all the functions that have completion callbacks are already asynchronous one solution is to simply leverage the completion callbacks to do what you need when they are called:

class MyApp:

  def __init__(self):
    self.client = CallClient()

  def join(self, url):
     self.client.join(url, completion = self.on_joined)

  def on_joined(self, data, error):
    # Do something
kylemcdonald commented 9 months ago

Thanks for looking into this, and for the design pattern suggestion. My goal was to be able to have this logic inside MyApp, and call await app.join() from outside, for example. But I think I can redesign what I'm doing to match your suggested event-driven approach.

For reference, I was testing on macOS 13.4.1, Python 3.9.18 running under Anaconda. And of course daily-python 0.2.0.

I have also seen the threading version work without any errors a couple times, but it is inconsistent.

aconchillo commented 9 months ago

Hi @kylemcdonald. The problem with asyncio and asyncio.Future is that it is not thread-safe as indicated in the documentation. So the line:

client.join(args.meeting, completion=lambda d,e: result.set_result(None))

Is not really doing what you are expecting it to do (or not always).

However, there's a way to solve that doing the following:

loop = asyncio.get_event_loop()
result = loop.create_future()
client.join(args.meeting, completion=lambda d,e: loop.call_soon_threadsafe(result.set_result, None))

Note the loop.call_soon_threadsafe() call. The call result = loop.create_future() is also preferable to asyncio.Future().

Even if it works, I would not recommend this approach since daily-python does not really work well with asyncio at the moment.

The threading approach should work, I'll take a look at that next. Thanks again for trying this out and reporting these issues.

aconchillo commented 9 months ago

Thanks for looking into this, and for the design pattern suggestion. My goal was to be able to have this logic inside MyApp, and call await app.join() from outside, for example. But I think I can redesign what I'm doing to match your suggested event-driven approach.

Sounds good. Let us know if you have any issues.

For reference, I was testing on macOS 13.4.1, Python 3.9.18 running under Anaconda. And of course daily-python 0.2.0.

I have also seen the threading version work without any errors a couple times, but it is inconsistent.

Thank you, I'll do more tests there.

aconchillo commented 9 months ago

When I run with "thread" it almost works. It seems like the join callback is called before it is really finished, and I get an error like this:

{"timestamp":"2023-10-06T23:49:14.845694Z","level":"ERROR","fields":{"message":"Failed to subscribe to cam-audio track: RecvTrackRequestError(ResponseCanceled)","PeerId":"e50a918d-e49e-4bc0-aa72-d29b2a7f118b"},"target":"daily_core::state::subscription"}

This is because there's another (or more) participant(s) in the meeting and the CallClient is trying to subscribe to the audio track while leaving.

Joining a meeting doesn't mean subscribing and finish handling all remote tracks. Joining means we just established a connection with the backend server. But, in the background, while the join completion is being called we are starting to connect to the remote tracks. So, it's normal to get his message, it shouldn't have any impact.

The "sleep" approach doesn't show this issue because it probably has time to finish subscribing to tracks.

And then the "leave" seems to trigger the callback before the "leave" is actually finished, and I get a segmentation fault or bus error. Here is the whole printout:

join
wait
leave
{"timestamp":"2023-10-06T23:49:14.845694Z","level":"ERROR","fields":{"message":"Failed to subscribe to cam-audio track: RecvTrackRequestError(ResponseCanceled)","PeerId":"e50a918d-e49e-4bc0-aa72-d29b2a7f118b"},"target":"daily_core::state::subscription"}
exit
[1]    98541 segmentation fault  python completion.py -m https://kcimc.daily.co/<channel name> -s thread
join
wait
leave
{"timestamp":"2023-10-06T23:49:28.567438Z","level":"ERROR","fields":{"message":"Failed to subscribe to cam-audio track: RecvTrackRequestError(ResponseCanceled)","PeerId":"e50a918d-e49e-4bc0-aa72-d29b2a7f118b"},"target":"daily_core::state::subscription"}
exit
[1]    98619 bus error  python completion.py -m https://kcimc.daily.co/<channel name> -s thread

I have been able to reproduce this issue and we should work on a fix for next release.

So, closing the loop. "sleep" approach works, "asyncio" approach works but needs to be handled with care when threads are involved (and probably not recommended) and "thread" approach works except there might be crashes on leave. Let me know if this makes sense.

I'll leave this issue open until we have a fix for the "after leave" crashes.

kylemcdonald commented 9 months ago

Let me know if this makes sense.

Makes perfect sense. Glad I was able to help uncover a small glitch. Thank you!

kylemcdonald commented 9 months ago

I found a test case where the threading approach fails, and only callbacks work:

import argparse
from daily import Daily, CallClient, EventHandler
import threading

class MyApp(EventHandler):
    def __init__(self, meeting_url, style):
        self.client = CallClient(self)
        self.client.join(meeting_url, completion=self.joined)
        self.style = style
        self.is_shutdown = threading.Event()

    def joined(self, data, error):
        print("joined")

    def on_app_message(self, message, participant):
        if message["message"] == "end":
            print("leaving")
            if self.style == "callback":
                self.client.leave(completion=lambda d,e: self.left())
            elif self.style == "thread":
                event = threading.Event()
                self.client.leave(completion=lambda d, e: event.set())
                event.wait()
                self.left()

    def left(self):
        print("left")
        self.is_shutdown.set()

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("-m", "--meeting", required=True, help="Meeting URL")
    parser.add_argument("-s", "--style", required=True, help="'thread', 'callback'")
    args = parser.parse_args()

    Daily.init()
    app = MyApp(args.meeting, args.style)
    app.is_shutdown.wait()

    print("exit")

if __name__ == "__main__":
    main()

Run this using:

python daily-message-end.py -m <url> -s callback
python daily-message-end.py -m <url> -s thread

After running it, and the console prints "joined", go to the pre-built chatroom. Type the word "end" into the chat.

The callback version will shutdown correctly. It should look like this:

joined # after this, type "end" in the chat
leaving
left
exit

The thread version will not. It should look like this:

joined # after this type "end" in the chat
leaving

At this point it will stall indefinitely. To me, this says that for some reason the event.wait() is waiting indefinitely.

It would be great to have some kind of blocking API for these calls. I thought I was clever by using a little wrapper like this:

def complete(func, *args, **kwargs):
    event = threading.Event()
    kwargs["completion"] = lambda d, e: event.set()
    func(*args, **kwargs)
    event.wait()

But due to these weird side effects, it looks like the only 100% solid solution is to chain callbacks.

kylemcdonald commented 9 months ago

It turns out the callbacks are not 100% effective. update_subscription_profiles() does not seem to call its completion callback. I could also add this as a separate issue if that helps.

Here is a small modification to the wav_audio_send.py example that shows how chained callbacks do not progress past update_subscription_profiles():

#
# This demo will join a Daily meeting and send the audio from a WAV file into
# the meeting. The WAV file is required to have a sample rate of 16000, 16-bit
# per sample and mono audio channel.
#
# Usage: python wav_audio_send.py -m MEETING_URL -i FILE.wav
#

import argparse
import time
import wave

from daily import *

class SendWavApp:
    def __init__(self, meeting_url, use_chaining=True):
        self.__mic_device = Daily.create_microphone_device(
            "my-mic",
            sample_rate = 16000,
            channels = 1
        )

        self.__client = CallClient()
        print("update_inputs")

        def update_inputs(data, error):
            self.__client.update_inputs({
                "camera": False,
                "microphone": {
                    "isEnabled": True,
                    "settings": {
                        "deviceId": "my-mic"
                    }
                }
            }, completion=update_subscription_profiles if use_chaining else None)

        def update_subscription_profiles(data, error):
            print("update_subscription_profiles")
            self.__client.update_subscription_profiles({
                "base": {
                    "camera": "unsubscribed",
                    "microphone": "unsubscribed"
                }
            }, completion=join if use_chaining else None)

        def join(data, error):
            print("join")
            self.__client.join(meeting_url)

        update_inputs(None, None)
        if not use_chaining:
            update_subscription_profiles(None, None)
            join(None, None)

    def join(self, meeting_url):
        self.__client.join(meeting_url)

    def leave(self):
        self.__client.leave()

    def send_wav_file(self, file_name):
        wav = wave.open(file_name, "rb")
        while True:
            sent_frames = 0
            total_frames = wav.getnframes()
            while sent_frames < total_frames:
                frames = wav.readframes(1600)
                frames_read = len(frames) / 2 # 16-bit linear PCM
                if frames_read > 0:
                    self.__mic_device.write_frames(frames)
                sent_frames += frames_read
            wav.rewind()

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("-m", "--meeting", required = True, help = "Meeting URL")
    parser.add_argument("-i", "--input", required = True, help = "WAV input file")
    parser.add_argument("-c", "--chaining", action="store_true", help = "Use chaining")

    args = parser.parse_args()

    Daily.init()

    app = SendWavApp(args.meeting, use_chaining=args.chaining)

    # Here we could use join() completion callback or an EventHandler.
    time.sleep(2)

    try :
        app.send_wav_file(args.input)
    except KeyboardInterrupt:
        app.leave()

    # Let leave finish
    time.sleep(2)

if __name__ == '__main__':
    main()

This doesn't really make sense to me, since it seemed like the threading approach was actually working for this before. So I'm going to try a new room and restarting just in case.

Edit 1: After restarting and trying a new room, I am able to confirm that update_subscription_profiles() does not call the callback in the above code for some reason. However I don't think it's a problem inherent to update_subscription_profiles() but it's a symptom of some bigger more general problem. Instead of relying on the threading or callbacks approach, I'm going to try to redesign around the EventHandler events.

Edit 2: I tried redesigning around EventHandler, and I can't figure out the right combination of Events to watch for that will guarantee it's ok to start sending audio. So... I'm back to sleep() and crossing my fingers 😂

Edit 3: To summarize, after lots of experiments, I was unable to find a solution that treats all the asynchronous calls in wav_audio_send as blocking: update_inputs/update_subscription_profiles/join. The only solution that has worked 100% of the time (so far) is to just calling these methods one after another, then sleeping 2 second. Anything else will hit a snag at some point, either a callback will not be called or an event.wait() will never return.

aconchillo commented 9 months ago

Hi @kylemcdonald . I believe you might need something like this:

#
# This demo will join a Daily meeting and send the audio from a WAV file into
# the meeting. The WAV file is required to have a sample rate of 16000, 16-bit
# per sample and mono audio channel.
#
# Usage: python wav_audio_send.py -m MEETING_URL -i FILE.wav
#

import argparse
import time
import threading
import wave

from daily import *

class SendWavApp:
    def __init__(self, input_file_name):
        self.__app_error = None

        self.__mic_device = Daily.create_microphone_device(
            "my-mic",
            sample_rate = 16000,
            channels = 1
        )

        self.__client = CallClient()

        self.__client.update_inputs({
            "camera": False,
            "microphone": {
                "isEnabled": True,
                "settings": {
                    "deviceId": "my-mic"
                }
            }
        }, completion=self.on_inputs_updated)

        self.__client.update_subscription_profiles({
            "base": {
                "camera": "unsubscribed",
                "microphone": "unsubscribed"
            }
        }, completion=self.on_subscription_profiles_updated)

        self.__start_event = threading.Event()
        self.__thread = threading.Thread(target = self.send_wav_file,
                                         args = [input_file_name]);
        self.__thread.start()

    def on_inputs_updated(self, inputs, error):
        if error:
            print(f"Unable to update inputs: {error}")
            self.__app_error = error

    def on_subscription_profiles_updated(self, inputs, error):
        if error:
            print(f"Unable to update subscription profiles: {error}")
            self.__app_error = error

    def on_joined(self, data, error):
        if error:
            print(f"Unable to join meeting: {error}")
            self.__app_error = error
        self.__start_event.set()

    def on_left(self, ignore, error):
        if error:
            print(f"Unable to leave meeting: {error}")

    def start(self, meeting_url):
        self.__client.join(meeting_url, completion=self.on_joined)
        self.__thread.join()

    def leave(self):
        self.__client.leave(completion=self.on_left)

    def send_wav_file(self, file_name):
        self.__start_event.wait()

        if self.__app_error:
            print(f"Unable to send WAV file!")
            return

        wav = wave.open(file_name, "rb")

        sent_frames = 0
        total_frames = wav.getnframes()
        while sent_frames < total_frames:
            frames = wav.readframes(1600)
            frames_read = len(frames) / 2 # 16-bit linear PCM
            if frames_read > 0:
                self.__mic_device.write_frames(frames)
            sent_frames += frames_read
        wav.rewind()

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("-m", "--meeting", required = True, help = "Meeting URL")
    parser.add_argument("-i", "--input", required = True, help = "WAV input file")

    args = parser.parse_args()

    Daily.init()

    app = SendWavApp(args.input)

    try :
        app.start(args.meeting)
    except KeyboardInterrupt:
        app.leave()

    # Let leave finish
    time.sleep(2)

if __name__ == '__main__':
    main()

Basically, this is what's going on:

The reason that your example didn't work is because you can't chain CallClient functions inside completion callbacks, because the second call will block until the first one has finished. But the first one can't finish because the second is waiting. Basically, a deadlock. Completion callbacks are just simple notifications.

So, one solution, as you already suggested, is to use events. Also, kind of related, with daily-python you can also build desktop UIs, and in those cases for example, you would trigger some UI error in the case update_inputs or update_subscription_profiles or anything else has errors.

I hope this new example is helpful. As you can see, there's no need to chain completion callbacks because in doing so we are trying to transform asynchronous functions into synchronous functions somehow.

kylemcdonald commented 9 months ago

This approach fixed my case, and I haven't had any new issues yet.

The thing that was unclear to me was whether it was ok to join before the inputs are updated and the subscription profile is updated. I had assumed that those things had to happen first. But in retrospect it should have been obvious because the examples show these methods being called immediately without sleeping or waiting for any callbacks.

For my application, a bot is continuously having a conversation and giving guidance to another participant. For this to function cleanly, the threading.Thread is essential otherwise the on_message can never interrupt the main speak/listen loop.

Thanks again for your guidance on this.

aconchillo commented 9 months ago

This approach fixed my case, and I haven't had any new issues yet.

Great to hear.

The thing that was unclear to me was whether it was ok to join before the inputs are updated and the subscription profile is updated. I had assumed that those things had to happen first. But in retrospect it should have been obvious because the examples show these methods being called immediately without sleeping or waiting for any callbacks.

I have updated the example and made it a bit more robust. There actually might be a case (if timings are not right) where the inputs have not been updated yet and we start writing to the microphone and that causes and error.

The idea in this updated version is to set the event only when we know both, join and update_inputs, have completed:

#
# This demo will join a Daily meeting and send the audio from a WAV file into
# the meeting. The WAV file is required to have a sample rate of 16000, 16-bit
# per sample and mono audio channel.
#
# Usage: python3 wav_audio_send.py -m MEETING_URL -i FILE.wav
#

import argparse
import time
import threading
import wave

from daily import *

class SendWavApp:
    def __init__(self, input_file_name):
        self.__mic_device = Daily.create_microphone_device(
            "my-mic",
            sample_rate = 16000,
            channels = 1
        )

        self.__client = CallClient()

        self.__client.update_inputs({
            "camera": False,
            "microphone": {
                "isEnabled": True,
                "settings": {
                    "deviceId": "my-mic"
                }
            }
        }, completion = self.on_inputs_updated)

        self.__client.update_subscription_profiles({
            "base": {
                "camera": "unsubscribed",
                "microphone": "unsubscribed"
            }
        })

        self.__app_quit = False
        self.__app_error = None
        self.__app_joined = False
        self.__app_inputs_updated = False

        self.__start_event = threading.Event()
        self.__thread = threading.Thread(target = self.send_wav_file,
                                         args = [input_file_name]);
        self.__thread.start()

    def on_inputs_updated(self, inputs, error):
        if error:
            print(f"Unable to updated inputs: {error}")
            self.__app_error = error
        else:
            self.__app_inputs_updated = True
        self.maybe_start()

    def on_joined(self, data, error):
        if error:
            print(f"Unable to join meeting: {error}")
            self.__app_error = error
        else:
            self.__app_joined = True
        self.maybe_start()

    def run(self, meeting_url):
        self.__client.join(meeting_url, completion=self.on_joined)
        self.__thread.join()

    def leave(self):
        self.__app_quit = True
        self.__thread.join()
        self.__client.leave()

    def maybe_start(self):
        if self.__app_error:
            self.__start_event.set()

        if self.__app_inputs_updated and self.__app_joined:
            self.__start_event.set()

    def send_wav_file(self, file_name):
        self.__start_event.wait()

        if self.__app_error:
            print(f"Unable to send WAV file!")
            return

        wav = wave.open(file_name, "rb")

        sent_frames = 0
        total_frames = wav.getnframes()
        while not self.__app_quit and sent_frames < total_frames:
            # Read 100ms worth of audio frames.
            frames = wav.readframes(1600)
            frames_read = len(frames) / 2 # 16-bit linear PCM
            if frames_read > 0:
                self.__mic_device.write_frames(frames)
            sent_frames += frames_read

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("-m", "--meeting", required = True, help = "Meeting URL")
    parser.add_argument("-i", "--input", required = True, help = "WAV input file")

    args = parser.parse_args()

    Daily.init()

    app = SendWavApp(args.input)

    try:
        app.run(args.meeting)
    except KeyboardInterrupt:
        print("Ctrl-C detected. Exiting!")
    finally:
        app.leave()

    # Let leave finish
    time.sleep(2)

if __name__ == '__main__':
    main()

For my application, a bot is continuously having a conversation and giving guidance to another participant. For this to function cleanly, the threading.Thread is essential otherwise the on_message can never interrupt the main speak/listen loop.

That's right.

Thanks again for your guidance on this.

Np. Let us know if you have any other issue. I will be updating all the demos this week to make them more robust and to be better guides on how to use the SDK. Thank you!

aconchillo commented 9 months ago

demos have been updated and and daily-python 0.3.0 should fix the crash after leave.

aconchillo commented 1 month ago

It's been a while, I was just updating some code to be more asyncio and remembered about this thread.

A nice way to do this, since we have completion callbacks, is to use futures:

async def join(self, meeting_url):
  future = asyncio.get_running_loop().create_future()

  def join_completion(data, error):
    if not future.cancelled():
      future.get_loop().call_soon_threadsafe(future.set_result, (data, error))

  client.join(meeting_url, client_settings={...}, completion=join_completion)

  return await future

Which means you can then do:

await join("...")
aconchillo commented 1 month ago

Here's an example https://github.com/daily-co/daily-python/blob/main/demos/audio/async_wav_audio_send.py