"""Support Home Assistant media_player entities to be used as Players for Music Assistant."""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime
from time import time
from typing import Optional, Tuple
from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
from homeassistant.components.media_player import MediaPlayerEntity
from homeassistant.components.media_player.const import DOMAIN as MEDIA_PLAYER_DOMAIN
from homeassistant.components.media_player.const import (
MEDIA_TYPE_MUSIC,
SUPPORT_TURN_OFF,
SUPPORT_TURN_ON,
SUPPORT_VOLUME_MUTE,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_ENTITY_ID,
EVENT_STATE_CHANGED,
STATE_IDLE,
STATE_OFF,
STATE_ON,
STATE_PAUSED,
STATE_PLAYING,
STATE_STANDBY,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.core import HomeAssistant, State, callback
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.entity_component import DATA_INSTANCES
from homeassistant.helpers.event import Event
from homeassistant.util.dt import utcnow
from music_assistant import MusicAssistant
from music_assistant.models.enums import ContentType
from music_assistant.models.player import (
DeviceInfo,
Player,
PlayerState,
get_child_players,
get_group_volume,
)
from .const import (
ATTR_SOURCE_ENTITY_ID,
ATV_DOMAIN,
CONF_PLAYER_ENTITIES,
DEFAULT_NAME,
DLNA_DOMAIN,
DOMAIN,
ESPHOME_DOMAIN,
SLIMPROTO_DOMAIN,
SLIMPROTO_EVENT,
SONOS_DOMAIN,
)
LOGGER = logging.getLogger(__name__)
OFF_STATES = (STATE_OFF, STATE_UNAVAILABLE, STATE_STANDBY)
CAST_DOMAIN = "cast"
CAST_MULTIZONE_MANAGER_KEY = "cast_multizone_manager"
GROUP_DOMAIN = "group"
STATE_MAPPING = {
STATE_OFF: PlayerState.OFF,
STATE_ON: PlayerState.IDLE,
STATE_UNKNOWN: PlayerState.IDLE,
STATE_UNAVAILABLE: PlayerState.OFF,
STATE_IDLE: PlayerState.IDLE,
STATE_PLAYING: PlayerState.PLAYING,
STATE_PAUSED: PlayerState.PAUSED,
STATE_STANDBY: PlayerState.OFF,
}
class HassPlayer(Player):
"""Generic/base Mapping from Home Assistant Mediaplayer to Music Assistant Player."""
use_mute_as_power: bool = False
def __init__(self, hass: HomeAssistant, entity_id: str) -> None:
"""Initialize player."""
self.hass = hass
# use the (source) entity_id as player_id for now, to be improved later with unique_id ?
self.player_id = entity_id
self.entity_id = entity_id
# grab a reference to the underlying entity
# we perform all logic directly on the entity instance and bypass the state machine
entity_comp = hass.data.get(DATA_INSTANCES, {}).get(MP_DOMAIN)
self.entity: MediaPlayerEntity = entity_comp.get_entity(entity_id)
manufacturer = "Home Assistant"
model = entity_id
if reg_entry := self.entity.registry_entry:
# grab device entry
if reg_entry.device_id:
dev_reg = dr.async_get(hass)
device = dev_reg.async_get(reg_entry.device_id)
manufacturer = device.manufacturer
model = device.model
self._attr_device_info = DeviceInfo(manufacturer=manufacturer, model=model)
self._attr_powered = False
self._attr_current_url = ""
self._attr_elapsed_time = 0
self.update_attributes()
@property
def name(self) -> str:
"""Return player name."""
if reg_entry := self.entity.registry_entry:
return reg_entry.name or self.entity.name
return self.entity_id
@property
def support_power(self) -> bool:
"""Return if this player supports power commands."""
return bool(self.entity.supported_features & SUPPORT_TURN_ON) and bool(
self.entity.supported_features & SUPPORT_TURN_OFF
)
@property
def powered(self) -> bool:
"""Return bool if this player is currently powered on."""
if not self.available:
return False
if self.use_mute_as_power:
return not self.volume_muted
if self.support_power:
return self.entity.state not in OFF_STATES
return self._attr_powered
@property
def elapsed_time(self) -> float:
"""Return elapsed time of current playing media in seconds."""
if not self.available:
return 0
# we need to return the corrected time here
extra_attr = self.entity.extra_state_attributes or {}
media_position = extra_attr.get(
"media_position_mass", self.entity.media_position
)
last_upd = self.entity.media_position_updated_at
if last_upd is None or media_position is None:
return 0
diff = (utcnow() - last_upd).seconds
return media_position + diff
@property
def current_url(self) -> str:
"""Return URL that is currently loaded in the player."""
return self.entity.media_content_id or self._attr_current_url
@property
def state(self) -> PlayerState:
"""Return current state of player."""
if not self.available:
return PlayerState.OFF
if not self.powered:
return PlayerState.OFF
if self.entity.state == PlayerState.OFF and self.powered:
return PlayerState.IDLE
return STATE_MAPPING.get(self.entity.state, PlayerState.OFF)
@property
def volume_level(self) -> int:
"""Return current volume level of player (scale 0..100)."""
if not self.available:
return 0
if self.is_group:
return get_group_volume(self)
if self.entity.support_volume_set:
return (self.entity.volume_level or 0) * 100
return 100
@property
def volume_muted(self) -> bool:
"""Return current mute mode of player."""
if not self.available:
return False
if self.entity.support_volume_mute:
return self.entity.is_volume_muted
return self._attr_volume_muted
@property
def supported_content_types(self) -> Tuple[ContentType]:
"""Return the content types this player supports."""
if not self.is_group:
return self._attr_supported_content_types
# return contenttypes that are supported by all child players
return tuple(
content_type
for content_type in ContentType
if all(
(
content_type in child_player.supported_content_types
for child_player in get_child_players(self, False, False)
)
)
)
@property
def supported_sample_rates(self) -> Tuple[int]:
"""Return the sample rates this player supports."""
if not self.is_group:
return self._attr_supported_sample_rates
return tuple(
sample_rate
for sample_rate in (44100, 48000, 88200, 96000)
if all(
(
sample_rate in child_player.supported_sample_rates
for child_player in get_child_players(self, False, False)
)
)
)
@callback
def on_hass_event(self, event: Event) -> None:
"""Call on Home Assistant event."""
self.update_attributes()
if event.event_type == "state_changed":
old_state = event.data.get("old_state")
new_state = event.data.get("new_state")
if old_state and new_state:
self.on_state_changed(old_state, new_state)
self.update_state()
@callback
def on_state_changed(self, old_state: State, new_state: State) -> None:
"""Call when state changes from HA player."""
LOGGER.debug(
"[%s] state_changed - old: %s - new: %s",
self.entity_id,
old_state.state,
new_state.state,
)
if new_state.state in OFF_STATES:
self._attr_current_url = None
@callback
def update_attributes(self) -> None:
"""Update attributes of this player."""
self._attr_available = self.entity.available
async def play_url(self, url: str) -> None:
"""Play the specified url on the player."""
# a lot of players do not power on at playback request so send power on from here
if not self.powered:
await self.power(True)
LOGGER.debug("[%s] play_url: %s", self.entity_id, url)
self._attr_current_url = url
if self.use_mute_as_power:
await self.volume_mute(False)
await self.entity.async_play_media(
MEDIA_TYPE_MUSIC,
url,
)
async def stop(self) -> None:
"""Send STOP command to player."""
LOGGER.debug("[%s] stop", self.entity_id)
self._attr_current_url = None
await self.entity.async_media_stop()
async def play(self) -> None:
"""Send PLAY/UNPAUSE command to player."""
LOGGER.debug("[%s] play", self.entity_id)
await self.entity.async_media_play()
async def pause(self) -> None:
"""Send PAUSE command to player."""
if not self.entity.support_pause:
LOGGER.warning(
"[%s] pause not supported, sending STOP instead...", self.entity_id
)
await self.stop()
return
LOGGER.debug("[%s] pause", self.entity_id)
await self.entity.async_media_pause()
async def power(self, powered: bool) -> None:
"""Send POWER command to player."""
LOGGER.debug("[%s] power: %s", self.entity_id, powered)
# send stop if this player is active queue
if not powered and self.active_queue.queue_id == self.player_id:
if self.state == PlayerState.PLAYING:
await self.active_queue.stop()
if self.use_mute_as_power:
await self.volume_mute(not powered)
elif powered and bool(self.entity.supported_features & SUPPORT_TURN_ON):
# regular turn_on command
await self.entity.async_turn_on()
elif not powered and bool(self.entity.supported_features & SUPPORT_TURN_OFF):
# regular turn_off command
await self.entity.async_turn_off()
# update local attribute anyway
# (for mute as power workaround and players without power support)
self._attr_powered = powered
self.update_state()
# check group power: power off group when last player powers down
if not powered:
self.check_group_power()
async def volume_set(self, volume_level: int) -> None:
"""Send volume level (0..100) command to player."""
LOGGER.debug("[%s] volume_set: %s", self.entity_id, volume_level)
if self.entity.support_volume_set:
await self.entity.async_set_volume_level(volume_level / 100)
async def volume_mute(self, muted: bool) -> None:
"""Send volume mute command to player."""
# for players that do not support mute, we fake mute with volume
if not bool(self.entity.supported_features & SUPPORT_VOLUME_MUTE):
await super().volume_mute(muted)
return
await self.entity.async_mute_volume(muted)
async def next_track(self) -> None:
"""Send next_track command to player."""
LOGGER.debug("[%s] next_track", self.entity_id)
await self.entity.async_media_next_track()
async def previous_track(self) -> None:
"""Send previous_track command to player."""
LOGGER.debug("[%s] previous_track", self.entity_id)
await self.entity.async_media_previous_track()
def check_group_power(self) -> None:
"""Check if groupplayer can be turned off when all childs are powered off."""
# convenience helper:
# power off group player if last child player turns off
for group_id in self.group_parents:
group_player = self.mass.players.get_player(group_id)
if not group_player:
continue
if not group_player.powered:
continue
powered_childs = set()
for child_player in get_child_players(group_player):
if child_player.player_id == self.player_id:
continue
if child_player.powered:
powered_childs.add(child_player.player_id)
if len(powered_childs) == 0:
self.mass.create_task(group_player.power(False))
if self.is_group:
# schedule update of the attributes to make it refresh group childs
self.mass.loop.call_later(5, self.update_state)
class SlimprotoPlayer(HassPlayer):
"""Representation of Hass player from Squeezebox Local integration."""
# TODO: read max sample rate and supported codecs from player
_attr_supported_content_types: Tuple[ContentType] = (
ContentType.FLAC,
ContentType.MP3,
ContentType.WAV,
ContentType.PCM_S16LE,
ContentType.PCM_S24LE,
)
_attr_supported_sample_rates: Tuple[int] = (
44100,
48000,
88200,
96000,
)
def __init__(self, *args, **kwargs) -> None:
"""Initialize player."""
super().__init__(*args, **kwargs)
self.slimserver = self.hass.data[SLIMPROTO_DOMAIN]
self._unsubs = [
self.hass.bus.async_listen(SLIMPROTO_EVENT, self.on_squeezebox_event)
]
@callback
def on_remove(self) -> None:
"""Call when player is about to be removed (cleaned up) from player manager."""
for unsub in self._unsubs:
unsub()
self._unsubs = []
@callback
def on_squeezebox_event(self, event: Event) -> None:
"""Handle special events from squeezebox players."""
if event.data["entity_id"] != self.entity_id:
return
cmd = event.data["command_str"]
if cmd == "playlist index +1":
self.hass.create_task(self.active_queue.next())
if cmd == "playlist index -1":
self.hass.create_task(self.active_queue.previous())
class ESPHomePlayer(HassPlayer):
"""Representation of Hass player from ESPHome integration."""
use_mute_as_power: bool = True
_attr_supported_content_types: Tuple[ContentType] = (ContentType.MP3,)
_attr_supported_sample_rates: Tuple[int] = (44100, 48000)
_attr_media_pos_updated_at: Optional[datetime] = None
@property
def elapsed_time(self) -> float:
"""Return elapsed time of current playing media in seconds."""
if self.state == PlayerState.PLAYING:
last_upd = self._attr_media_pos_updated_at
media_pos = self._attr_elapsed_time
if last_upd is None or media_pos is None:
return 0
diff = (utcnow() - last_upd).seconds
return media_pos + diff
if self.state == PlayerState.PAUSED:
return self._attr_elapsed_time
return 0
@callback
def on_state_changed(self, old_state: State, new_state: State) -> None:
"""Call when state changes from HA player."""
super().on_state_changed(old_state, new_state)
# TEMP! This needs to be fixed upstream in the ESPHome integration
old_state = old_state.state
new_state = new_state.state
if old_state == STATE_PAUSED and new_state == STATE_PLAYING:
self._attr_media_pos_updated_at = utcnow()
elif new_state == STATE_PAUSED:
last_upd = self._attr_media_pos_updated_at
media_pos = self._attr_elapsed_time
if last_upd is not None and media_pos is not None:
diff = (utcnow() - last_upd).seconds
self._attr_elapsed_time = media_pos + diff
elif old_state != STATE_PLAYING and new_state == STATE_PLAYING:
self._attr_media_pos_updated_at = utcnow()
self._attr_elapsed_time = 0
async def play_url(self, url: str) -> None:
"""Play the specified url on the player."""
await super().play_url(url)
self._attr_media_pos_updated_at = utcnow()
self._attr_elapsed_time = 0
class ATVPlayer(HassPlayer):
"""Representation of Hass player from ATV/Airplay integration."""
_attr_supported_content_types: Tuple[ContentType] = (ContentType.MP3,)
_attr_supported_sample_rates: Tuple[int] = (44100, 48000)
class CastPlayer(HassPlayer):
"""Representation of Hass player from cast integration."""
_attr_supported_sample_rates: Tuple[int] = (44100, 48000, 88200, 96000)
def __init__(self, *args, **kwargs) -> None:
"""Initialize cast player control."""
super().__init__(*args, **kwargs)
self.cast_uuid = self.entity.registry_entry.unique_id
if self._attr_device_info.model == "Google Cast Group":
# this is a cast group
self._attr_is_group = True
self.update_attributes()
@callback
def update_attributes(self) -> None:
"""Update attributes of this player."""
super().update_attributes()
self.use_mute_as_power = not (self.is_group or len(self.group_parents) == 0)
if not self._attr_is_group:
return
# this is a bit hacky to get the group members
# TODO: create PR to add these as state attributes to the cast integration
# pylint: disable=protected-access
if CAST_MULTIZONE_MANAGER_KEY not in self.hass.data:
return
mz_mgr = self.hass.data[CAST_MULTIZONE_MANAGER_KEY]
if self.cast_uuid not in mz_mgr._groups:
return
mz_ctrl = mz_mgr._groups[self.cast_uuid]["listener"]._mz
child_players = []
ent_reg = er.async_get(self.hass)
for cast_uuid in mz_ctrl.members:
if entity_id := ent_reg.entities.get_entity_id(
(MP_DOMAIN, CAST_DOMAIN, cast_uuid)
):
child_players.append(entity_id)
self._attr_group_childs = child_players
async def play_url(self, url: str) -> None:
"""Play the specified url on the player."""
self._attr_powered = True
if self.use_mute_as_power:
await self.volume_mute(False)
# pylint: disable=import-outside-toplevel,protected-access
from homeassistant.components.cast.media_player import quick_play
cast = self.entity._chromecast
app_data = {
"media_id": url,
"media_type": f"audio/{self.active_queue.settings.stream_type.value}",
"enqueue": False,
"title": f" Streaming from {DEFAULT_NAME}",
}
await self.hass.async_add_executor_job(
quick_play, cast, "default_media_receiver", app_data
)
# enqueue second item to allow on-player control of next
# (or shout next track from google assistant)
await asyncio.sleep(1)
if self.active_queue.stream and self.active_queue.stream.is_alert:
return
if self.active_queue.stream and len(self.active_queue.items) < 2:
return
enqueue_data = {**app_data}
enqueue_data["enqueue"] = True
enqueue_data["media_id"] = self.mass.streams.get_control_url(
self.active_queue.queue_id
)
await self.hass.async_add_executor_job(
quick_play, cast, "default_media_receiver", enqueue_data
)
class SonosPlayer(HassPlayer):
"""Representation of Hass player from Sonos integration."""
_attr_supported_sample_rates: Tuple[int] = (44100, 48000)
_sonos_paused: bool = False
def __init__(self, *args, **kwargs) -> None:
"""Initialize."""
self._last_info_fetch = 0
self._paused = False
super().__init__(*args, **kwargs)
@property
def state(self) -> PlayerState:
"""Return current PlayerState of player."""
# a sonos player is always either playing or paused
# consider idle if nothing is playing and we did not pause
if self.entity.state == STATE_PAUSED and not self._sonos_paused:
return PlayerState.IDLE
return super().state
@callback
def on_state_changed(self, old_state: State, new_state: State) -> None:
"""Call when state changes from HA player."""
super().on_state_changed(old_state, new_state)
self.hass.create_task(self.poll_sonos(True))
@property
def elapsed_time(self) -> float:
"""Return elapsed time of current playing media in seconds."""
# elapsed_time is read by the queue controller every second while playing
# add the poll task here to make sure we have accurate info
self.hass.create_task(self.poll_sonos())
return super().elapsed_time
async def play(self) -> None:
"""Send PLAY/UNPAUSE command to player."""
self._sonos_paused = False
await super().play()
async def pause(self) -> None:
"""Send PAUSE command to player."""
self._sonos_paused = True
await super().pause()
async def play_url(self, url: str) -> None:
"""Play the specified url on the player."""
self._sonos_paused = False
self._attr_powered = True
if self.use_mute_as_power:
await self.volume_mute(False)
def _play_url():
soco = self.entity.coordinator.soco
meta = (
'<DIDL-Lite xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/" xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/" xmlns:dlna="urn:schemas-dlna-org:metadata-1-0/">'
'<item id="1" parentID="0" restricted="1">'
"<dc:title>Streaming from Music Assistant</dc:title>"
"<dc:creator></dc:creator>"
"<upnp:album></upnp:album>"
"<upnp:channelName>Music Assistant</upnp:channelName>"
"<upnp:channelNr>0</upnp:channelNr>"
"<upnp:class>object.item.audioItem.audioBroadcast</upnp:class>"
f'<res protocolInfo="http-get:*:audio/flac:DLNA.ORG_OP=00;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000">{url}</res>'
"</item>"
"</DIDL-Lite>"
)
soco.play_uri(url, meta=meta, force_radio=False)
await self.hass.loop.run_in_executor(None, _play_url)
await self.poll_sonos(True)
async def poll_sonos(self, force: Optional[bool] = None) -> None:
"""Call when the PlayerQueue polls the player for accurate info."""
def poll_sonos():
if self.entity.speaker.is_coordinator:
self.entity.media.poll_media()
LOGGER.debug("poll sonos")
if force is None:
force = (
self.entity.media_position is None
and self.entity.state == STATE_PLAYING
)
if force or (time() - self._last_info_fetch) > 30:
await self.hass.loop.run_in_executor(None, poll_sonos)
self._last_info_fetch = time()
class DlnaPlayer(HassPlayer):
"""Representation of Hass player from DLNA integration."""
_attr_supported_sample_rates: Tuple[int] = (44100, 48000)
_attr_supported_content_types: Tuple[ContentType] = (
ContentType.MP3,
ContentType.FLAC,
)
async def play_url(self, url: str) -> None:
"""Play the specified url on the player."""
self._attr_powered = True
if self.use_mute_as_power:
await self.volume_mute(False)
# pylint: disable=protected-access
device = self.entity._device
didl_metadata = (
'<DIDL-Lite xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/" xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/" xmlns:dlna="urn:schemas-dlna-org:metadata-1-0/">'
'<item id="1" parentID="0" restricted="1">'
"<dc:title>Streaming from Music Assistant</dc:title>"
"<dc:creator></dc:creator>"
"<upnp:album></upnp:album>"
"<upnp:channelName>Music Assistant</upnp:channelName>"
"<upnp:channelNr>0</upnp:channelNr>"
"<upnp:class>object.item.audioItem.audioBroadcast</upnp:class>"
f'<res protocolInfo="http-get:*:audio/flac:DLNA.ORG_OP=00;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000">{url}</res>'
"</item>"
"</DIDL-Lite>"
)
if device.can_stop:
await self.entity.async_media_stop()
# Queue media
await device.async_set_transport_uri(
url, "Streaming from Music Assistant", didl_metadata
)
if self.state == PlayerState.PLAYING:
return
await device.async_wait_for_can_play()
await self.entity.async_media_play()
class HassGroupPlayer(HassPlayer):
"""Mapping from Home Assistant Grouped Mediaplayer to Music Assistant Player."""
def __init__(self, *args, **kwargs) -> None:
"""Initialize player."""
super().__init__(*args, **kwargs)
self._attr_is_group = True
self._attr_use_multi_stream = True
self._attr_current_url = ""
self._attr_device_info = DeviceInfo(
manufacturer="Home Assistant", model="Media Player Group"
)
self.update_attributes()
@property
def support_power(self) -> bool:
"""Return if this player supports power commands."""
return False
@property
def state(self) -> PlayerState:
"""Return the state of the grouped player."""
# grab details from first (powered) group child
for child_player in get_child_players(self, True):
if not child_player.current_url:
continue
return child_player.state
return super().state
@property
def current_url(self) -> str:
"""Return the current_url of the grouped player."""
# grab details from first (powered) group child
for child_player in get_child_players(self, True):
if not child_player.current_url:
continue
return child_player.current_url
return super().state
@property
def elapsed_time(self) -> float:
"""Return the corrected/precise elsapsed time of the grouped player."""
# grab details from first (powered) group child
for child_player in get_child_players(self, True):
if not child_player.current_url:
continue
return child_player.elapsed_time
return 0
async def stop(self) -> None:
"""Send STOP command to player."""
# redirect command to all child players
await asyncio.gather(*[x.stop() for x in get_child_players(self, True)])
async def play(self) -> None:
"""Send PLAY/UNPAUSE command to player."""
# redirect command to all child players
await asyncio.gather(*[x.play() for x in get_child_players(self, True)])
async def pause(self) -> None:
"""Send PAUSE command to player."""
# redirect command to all child players
await asyncio.gather(*[x.pause() for x in get_child_players(self, True)])
async def power(self, powered: bool) -> None:
"""Send POWER command to player."""
self._attr_powered = powered
self.update_state()
async def play_url(self, url: str) -> None:
"""Play the specified url on the player."""
self._attr_powered = True
self._attr_current_url = url
# redirect command to all child players
await asyncio.gather(*[x.play_url(url) for x in get_child_players(self, True)])
@callback
def on_hass_event(self, event: Event) -> None:
"""Call on Home Assistant event."""
self.update_attributes()
self.update_state()
@callback
def update_attributes(self) -> None:
"""Call when player state is about to be updated in the player manager."""
hass_state = self.hass.states.get(self.entity_id)
self._attr_available = hass_state.state != STATE_UNAVAILABLE
self._attr_name = hass_state.name
# collect the group childs, be prepared for the usecase where the user actually
# added a mass player to a group, translate that to the underlying entity.
group_childs = []
extra_attr = self.entity.extra_state_attributes or {}
for entity_id in extra_attr.get(ATTR_ENTITY_ID, []):
if source_id := self._get_source_entity_id(entity_id):
group_childs.append(source_id)
self._attr_group_childs = group_childs
def on_child_update(self, player_id: str, changed_keys: set) -> None:
"""Call when one of the child players of a playergroup updates."""
# resume queue if a child player turns on while this queue is playing
if (
"powered" in changed_keys
and self.active_queue.active
and self.state == PlayerState.PLAYING
):
if child_player := self.mass.players.get_player(player_id):
if child_player.powered:
self.mass.create_task(self.active_queue.resume())
return
self.update_state(skip_forward=True)
def _get_source_entity_id(self, entity_id: str) -> str | None:
"""Return source entity_id from child entity_id."""
if hass_state := self.hass.states.get(entity_id):
# if entity is actually already mass entity, return the source entity
if source_id := hass_state.attributes.get(ATTR_SOURCE_ENTITY_ID):
return source_id
return entity_id
return None
PLAYER_MAPPING = {
ATV_DOMAIN: ATVPlayer,
CAST_DOMAIN: CastPlayer,
DLNA_DOMAIN: DlnaPlayer,
SLIMPROTO_DOMAIN: SlimprotoPlayer,
ESPHOME_DOMAIN: ESPHomePlayer,
SONOS_DOMAIN: SonosPlayer,
GROUP_DOMAIN: HassGroupPlayer,
}
async def async_register_player_control(
hass: HomeAssistant, mass: MusicAssistant, entity_id: str
) -> HassPlayer | None:
"""Register hass media_player entity as player control on Music Assistant."""
# check for existing player first if already registered
if player := mass.players.get_player(entity_id):
return player
entity_comp = hass.data.get(DATA_INSTANCES, {}).get(MP_DOMAIN)
if not entity_comp:
return None
entity: MediaPlayerEntity = entity_comp.get_entity(entity_id)
if not entity:
return None
player = None
# Integration specific player controls
entry_platform = entity.platform.platform_name
if entry_platform == DOMAIN:
# this is already a Music assistant player
return
# load player specific mapping or generic one
player_cls = PLAYER_MAPPING.get(entry_platform, HassPlayer)
player = player_cls(hass, entity_id)
await mass.players.register_player(player)
return player
async def async_register_player_controls(
hass: HomeAssistant, mass: MusicAssistant, entry: ConfigEntry
):
"""Register hass entities as player controls on Music Assistant."""
allowed_entities = entry.options.get(CONF_PLAYER_ENTITIES, [])
async def async_hass_state_event(event: Event) -> None:
"""Handle hass state-changed events to update registered PlayerControls."""
entity_id: str = event.data[ATTR_ENTITY_ID]
if not entity_id.startswith(MP_DOMAIN):
return
# handle existing source player
if source_player := mass.players.get_player(entity_id):
source_player.on_hass_event(event)
return
# entity not (yet) registered
if entity_id in allowed_entities:
await async_register_player_control(hass, mass, entity_id)
# register event listener
entry.async_on_unload(
hass.bus.async_listen(EVENT_STATE_CHANGED, async_hass_state_event)
)
# register all current entities
def register_all():
for entity in hass.states.async_all(MEDIA_PLAYER_DOMAIN):
if entity.entity_id not in allowed_entities:
continue
hass.create_task(
async_register_player_control(hass, mass, entity.entity_id)
)
register_all()
# schedule register a few minutes after startup to catch any slow loading platforms
hass.loop.call_later(30, register_all)
hass.loop.call_later(120, register_all)
pylint: disable=protected-access
(or shout next track from google assistant)
consider idle if nothing is playing and we did not pause
add the poll task here to make sure we have accurate info
added a mass player to a group, translate that to the underlying entity.
https://github.com/pkissling/home-assistant/blob/825ef13edbee9913666bfef4d9afe101aed9c526/custom_components/mass/player_controls.py#L482