Open swanux opened 4 years ago
Hey @swanux thanks for taking the time to try out mpris_server! Sorry that there wasn't sufficient documentation and that mpris_server gave you a hard time, but I'm glad to see that you wanted to use it :)
mpris_server was the most minimal release to get chromecast_mpris out the door, so it was in a somewhat disorganized state when you went to use it.
Over the last few weeks, however, there were a few bugs fixed in mpris_server and the Chromecast client, and this library's API isn't going to change much. I'd say it needs some docs, and besides some changes I'm going to make based on your bug report, it won't change much.
Probably it's kind of a dumb question, but I've heard about mpris only today (because it turned out I'll need it). I've tried to implement the solution, I have a working server, however can't handle events. Here's the output of playerctl for example:
I'm happy to answer your questions, and no question is dumb. Yeah, event handling can be tricky and I wasn't clear with the example in the docs.
Without seeing the code or knowing what you're trying to add mpris_server to, I can't be too specific with a solution that would help, but I'm happy to try.
With the "pause" feature first you want to make sure that you have the can_pause()
method implemented in your subclassed MprisAdapter
, and that it returns True, and have an implementation of play()
from mpris_server.adapters import Metadata
class MyAdapter(MprisAdapter):
def can_pause(self) -> bool:
return True
def pause(self):
pass
Then, you want to build an event handler from the EventAdapter
class.
To do this, you will need some insight into the app you're integrating mpris_server with.
For example, with chromecast_mpris, it is integrating with pychromecast
, and pychromecast
lets you register an event listener for Chromecast status updates.
Those updates include information MPRIS would care about, like media duration, title, whether it's paused, etc.
pychromecast
expects an object with a new_cast_status(self, status)
callback method, and the library will call that method when the Chromecast's metadata changes.
In that callback, we call one of the on_volume
, on_playback
, etc methods from EventAdapter
.
That way, when a new status update comes in, mpris_server gets notified of it through on_volume
getting called, and then it can pass the new metadata off to DBus for playerctl
to consume.
from mpris_server.adapters import EventAdapter
class MyEventHandler(EventAdapter):
def new_cast_status(self, status):
self.on_playback()
def on_event_from_myapp(self, example_event):
self.on_playpause()
So what you need to do is find out where you can hook a callback into your application that will call the on_playpause()
method from EventAdapter
.
Also, make sure you run loop()
on the Server
instance:
from mpris_server.server import Server
import your_app
adapter = MyAdapter()
mpris = Server(name='Example', adapter=adapter)
# EventAdapter still expects Player and Root to get passed from server
event_handler = MyEventHandler(mpris.player, mpris.root)
your_app.register_event_handler(event_handler)
mpris.publish()
mpris.loop()
You can skip loop()
if you enter the DBus event loop in the same process somewhere else.
A problem I ran into consistently was the fact the device would get published on the DBus interface, but I couldn't interact with it because I never entered the DBus event loop properly. That might be the issue, too.
If you want to share the code, or want some help in Slack or IRC, I'd be glad to help out further. I'd also love to hear about what you're using mpris_server in!
Either way, thanks for the kind words and for your interest. I'll clean up the docs and add more to them, and make integrating mpris_server a more pleasant experience.
Thank you for the answer @alexdelorenzo ! Using what you've said I've put together a sample file. I want to implement mpris for my application, called HTidal (Tidal client for linux with some extras written in python and GTK). I'll post the code of the mpris server here, so it'll be easier to find the problem.
Some notes for the code:
The code:
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
from mpris_server.adapters import MprisAdapter, EventAdapter, Track, PlayState
from mpris_server.server import Server
from htidal import GUI
class HAdapter(MprisAdapter):
def can_pause(self) -> bool:
return True
def quit(self):
GUI.on_main_delete_event(GUI, 0, 0)
def get_current_position(self):
try:
nan_pos = GUI.player.query_position(Gst.Format.TIME)[1]
position = float(nan_pos) / Gst.SECOND
except:
position = None
return position
def next(self):
GUI.on_next(GUI, 0)
def previous(self):
GUI.on_prev(GUI, 0)
def pause(self):
print('inside')
GUI.pause(GUI)
def resume(self):
GUI.resume(GUI)
def stop(self):
GUI.stop(GUI, 0)
def play(self):
GUI.play(GUI)
def get_playstate(self) -> PlayState:
if not GUI.playing:
if not GUI.res:
return PlayState.STOPPED
else:
return PlayState.PAUSED
else:
return PlayState.PLAYING
def seek(self, time):
print(time)
GUI.player.seek_simple(Gst.Format.TIME, Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT, time * Gst.SECOND)
def get_art_url(self, track):
print('Later')
return 'Later'
def get_stream_title(self):
print('Later again')
def get_current_track(self):
art_url = self.get_art_url(0)
content_id = 0
name = 0
duration = 0
track = "xy"
return track
class HEventHandler(EventAdapter):
def on_app_event(self, event: str):
if event == 'pause':
print('pause')
else:
print('NOPE')
my_adapter = HAdapter()
mpris = Server('HTidal', adapter=my_adapter)
event_handler = HEventHandler("HTidal", "root")
mpris.loop()
Awesome thanks for sharing the code. HTidal sounds like a cool app and I can't wait to see how it comes along!
I took your HAdapter
code and merged it with the ChromecastAdapter
code that is known to work, that way it'll be easier to for you to debug.
There are a few methods on MprisAdapter
that need to be implemented correctly, otherwise D-Bus will reject them. Namely, either the get_current_track()
method, or the metadata()
method. I implemented the metadata()
method so you don't need to implement get_current_track()
at all unless you want to.
I also corrected the way HEventHandler
is instantiated, it expects the Root
and Player
interfaces from the Server
object named mpris
to get passed into it.
One thing to note is that somewhere in your HTidal app, you need to call on_app_event()
on HEventerHandler
, otherwise D-Bus won't know about updates that happen in HTidal. So after event_handler = HEventHandler(mpris.player, mpris.root)
, but before mrpis.loop()
, you need to pass event_handler
to HTidal.
I added some comments on the end to be a little more clear. If you put the code into a repo on GitHub I can get a better idea of how to help you integrate mpris_server with HTidal, too.
Let me know if this works, or if you need some assistance otherwise.
@swanux Just tagging you so you get a notification
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
from typing import List
from mpris_server.adapters import Metadata, PlayState, MprisAdapter, \
Microseconds, VolumeDecimal, RateDecimal
from mpris_server.base import URI, MIME_TYPES, BEGINNING, DEFAULT_RATE, DbusObj
from mpris_server.server import Server
from htidal import GUI
class HAdapter(MprisAdapter):
def get_uri_schemes(self) -> List[str]:
return URI
def get_mime_types(self) -> List[str]:
return MIME_TYPES
def can_quit(self) -> bool:
return True
def quit(self):
GUI.on_main_delete_event(GUI, 0, 0)
def get_current_position(self):
try:
nan_pos = GUI.player.query_position(Gst.Format.TIME)[1]
position = float(nan_pos) / Gst.SECOND
except:
position = None
return position
def next(self):
GUI.on_next(GUI, 0)
def previous(self):
GUI.on_prev(GUI, 0)
def pause(self):
print('inside')
GUI.pause(GUI)
def resume(self):
GUI.resume(GUI)
def stop(self):
GUI.stop(GUI, 0)
def play(self):
GUI.play(GUI)
def get_playstate(self) -> PlayState:
if not GUI.playing:
if not GUI.res:
return PlayState.STOPPED
else:
return PlayState.PAUSED
else:
return PlayState.PLAYING
def seek(self, time):
print(time)
GUI.player.seek_simple(Gst.Format.TIME, Gst.SeekFlags.FLUSH | Gst.SeekFlags.KEY_UNIT, time * Gst.SECOND)
def is_repeating(self) -> bool:
return False
def is_playlist(self) -> bool:
return self.can_go_next() or self.can_go_previous()
def set_repeating(self, val: bool):
pass
def set_loop_status(self, val: str):
pass
def get_rate(self) -> float:
return 1.0
def set_rate(self, val: float):
pass
def get_shuffle(self) -> bool:
return False
def set_shuffle(self, val: bool):
return False
def get_art_url(self, track):
print('Later')
return 'Later'
def get_stream_title(self):
print('Later again')
def is_mute(self) -> bool:
return False
def can_go_next(self) -> bool:
return False
def can_go_previous(self) -> bool:
return False
def can_play(self) -> bool:
return True
def can_pause(self) -> bool:
return True
def can_seek(self) -> bool:
return False
def can_control(self) -> bool:
return True
def get_stream_title(self) -> str:
return "Test title"
def metadata(self) -> dict:
metadata = {
"mpris:trackid": "/track/1",
"mpris:length": 0,
"mpris:artUrl": "Example",
"xesam:url": "https://google.com",
"xesam:title": "Example title",
"xesam:artist": [],
"xesam:album": "Album name",
"xesam:albumArtist": [],
"xesam:discNumber": 1,
"xesam:trackNumber": 1,
"xesam:comment": [],
}
return metadata
class HEventHandler(EventAdapter):
def on_app_event(self, event: str):
print(f"Event received: {event}")
if event == 'pause':
self.on_playpause()
my_adapter = HAdapter()
mpris = Server('HTidal', adapter=my_adapter)
event_handler = HEventHandler(mpris.player, mpris.root) # need to pass mpris.player & mpris.root
# right here you need to pass event_handler to htidal
mpris.loop()
Probably it's kind of a dumb question, but I've heard about mpris only today (because it turned out I'll need it). I've tried to implement the solution, I have a working server, however can't handle events. Here's the output of playerctl for example:
I've used chromecast-mpris as an example as advised in the readme, but I wasn't able to figure this last part out. If anyone can help me, I can show a minimal example. Anyway, it's a nice module @alexdelorenzo , just lacks some documentation / actually working example (at least when comming without background knowledge).