Closed masumr closed 1 month ago
you have not given enough code, i can't reproduce with what you have provided.
@isaackogan, here is my full code
from re import Match
from abc import ABC
from TikTokLive.client.client import TikTokLiveClient
from TikTokLive.events import ConnectEvent, DisconnectEvent, LiveEndEvent
from TikTokLive.events import proto_events, custom_events
import re
HANDLED_MESSAGE_EVENTS = [
proto_events.JoinEvent, proto_events.LikeEvent, proto_events.GiftEvent,
proto_events.EnvelopeEvent, proto_events.SubscribeEvent, custom_events.FollowEvent,
custom_events.ShareEvent, proto_events.EmoteChatEvent
]
LIVE_URL_REGEX = re.compile('^https?://(www\\.)?tiktok\\.com/@([^/]+)/live$')
LIVESTREAM_STOP_OR_FAILED_RESPONSE = -1
def extract_unique_id(stream_url: str):
match: Match = LIVE_URL_REGEX.match(stream_url)
if match:
return match.groups()[-1]
return None
class TikTokLiveStreamIterator(TikTokLiveClient):
def __init__(self, stream_url: str, **options):
TikTokLiveClient.__init__(self, unique_id=stream_url)
self.on(ConnectEvent, self._process_connect_event)
self.on(DisconnectEvent, self._process_disconnect_event)
self.on(LiveEndEvent, self._process_live_end_event)
for message_event in HANDLED_MESSAGE_EVENTS:
self.on(message_event, self._process_message_event)
async def _process_connect_event(self, event: ConnectEvent):
from datetime import datetime
output_path = f"{event.unique_id}-{datetime.utcnow()}.mp4"
from TikTokLive.client.web.routes.fetch_video import VideoFetchQuality
self.web.fetch_video.start(
output_fp=output_path,
room_info=self.room_info,
output_format="mp4",
quality=VideoFetchQuality.ORIGIN
)
print(f"Client connected.")
async def _process_disconnect_event(self, event: DisconnectEvent):
if self.web.fetch_video.is_recording:
self.web.fetch_video.stop()
print(f"Client disconnected!")
async def _process_live_end_event(self, event: LiveEndEvent):
print(f"Stream ended.")
async def _process_message_event(self, event):
print(f"\n\n**********************--- {event.get_type().lower()} ---******************************")
try:
print(f"{event.user.unique_id}")
except Exception as e:
print(e)
print(event.__dict__)
print("********************************************************************************\n\n")
async def start(self) -> None:
print(f"Starting iterator for unique id: {self.unique_id}.")
await TikTokLiveClient.start(self, fetch_room_info=True)
async def stop(self) -> None:
print(f"Stopping iterator for unique id: {self.unique_id}.")
TikTokLiveClient.remove_all_listeners(self)
await TikTokLiveClient.disconnect(self)
print(f"Stopped iterator for unique id: {self.unique_id}.")
@classmethod
def can_monitor_stream_url(cls, stream_url: str):
return LIVE_URL_REGEX.match(stream_url) is not None
import asyncio
async def main():
tt = TikTokLiveStreamIterator('https://www.tiktok.com/@wakbogel_2/live')
await tt.start()
await asyncio.sleep(60)
await tt.stop()
asyncio.run(main())
Okay, so basically for the pattern you are trying to accomplish you should use client.add_listener
and not client.on
as that was intended strictly a decorator.
It's a little complicated why it doesn't work...I guess technically speaking it is a bug, but it was never intended to be used the way you did, so not sure if I should call it that.
I can support it being used functionally like that in the future, but for now, use the intended client.add_listener
instead and it works :)
So, in this file TikTokLive/client/client.py code update in function add_listener
def add_listener(self, event: Type[Event], f: EventHandler) -> Handler:
"""
Method that can be used to register a Python function as an event listener
:param event: The event to listen to
:param f: The function to handle the event
:return: The generated `pyee.Handler` object
"""
if isinstance(event, str):
return super().add_listener(event=event, f=f)
return super().add_listener(event=event.get_type(), f=f)
And this change works for me, could you please update this change in you package?
Okay, Thank you, used add_listener and solved this issue.
Going to keep this open until I add the 'fix' for the way you were trying to do it.
I think your solution is fine, can you make a pull request?
Thank you for your compliment, Pull request is here -pull-request
Merged
We can import TikTokLiveClient and inherit of this class in our own class like as
After the in this so, we are getting error on the Connect event, error snapshot is given below-
Package Version
My version of TikTokLive is 6.0.3
Operating System
Linux