happyleavesaoc / python-snapcast

Python API for controlling Snapcast, a multi-room synchronous audio solution.
MIT License
70 stars 28 forks source link

Request: Support stream commands (play, pause, next etc) #72

Open Chreece opened 4 months ago

Chreece commented 4 months ago

Is it possible to add support for these functions?

Documentation

Chreece commented 4 months ago

I'm no programmer, don't kill me please... Just wanted to see how good is chatgpt:

stream.py:

"""Snapcast stream."""

import requests
import json

class Snapstream():
    """Represents a snapcast stream."""

    def __init__(self, data, host, port=1705):
        """Initialize."""
        self.update(data)
        self._callback_func = None
        self.host = host
        self.port = port
        self.url = f"http://{self.host}:{self.port}/jsonrpc"

    @property
    def identifier(self):
        """Get stream id."""
        return self._stream.get('id')

    @property
    def status(self):
        """Get stream status."""
        return self._stream.get('status')

    @property
    def name(self):
        """Get stream name."""
        return self._stream.get('uri').get('query').get('name')

    @property
    def friendly_name(self):
        """Get friendly name."""
        return self.name if self.name != '' else self.identifier

    @property
    def metadata(self):
        """Get metadata."""
        if 'properties' in self._stream:
            return self._stream['properties'].get('metadata')
        return self._stream.get('meta')

    @property
    def meta(self):
        """Get metadata. Deprecated."""
        return self.metadata

    @property
    def properties(self):
        """Get properties."""
        return self._stream.get('properties')

    @property
    def path(self):
        """Get stream path."""
        return self._stream.get('uri').get('path')

    def update(self, data):
        """Update stream."""
        self._stream = data

    def update_meta(self, data):
        """Update stream metadata."""
        self.update_metadata(data)

    def update_metadata(self, data):
        """Update stream metadata."""
        if 'properties' in self._stream:
            self._stream['properties']['metadata'] = data
        self._stream['meta'] = data

    def update_properties(self, data):
        """Update stream properties."""
        self._stream['properties'] = data

    def __repr__(self):
        """Return string representation."""
        return f'Snapstream ({self.name})'

    def callback(self):
        """Run callback."""
        if self._callback_func and callable(self._callback_func):
            self._callback_func(self)

    def set_callback(self, func):
        """Set callback."""
        self._callback_func = func

    def _send_command(self, method, params=None):
        headers = {'Content-Type': 'application/json'}
        payload = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params if params else [],
            "id": 1
        }
        response = requests.post(self.url, headers=headers, data=json.dumps(payload))
        return response.json()

    def play(self):
        """Play the stream."""
        return self._send_command("StreamControl.Play", [self.identifier])

    def pause(self):
        """Pause the stream."""
        return self._send_command("StreamControl.Pause", [self.identifier])

    def next(self):
        """Next track in the stream."""
        return self._send_command("StreamControl.Next", [self.identifier])

    def previous(self):
        """Previous track in the stream."""
        return self._send_command("StreamControl.Previous", [self.identifier])

    def shuffle(self, enable=True):
        """Enable or disable shuffle."""
        return self._send_command("StreamControl.Shuffle", [self.identifier, enable])

    def repeat(self, mode):
        """Set repeat mode."""
        # mode should be one of 'off', 'one', 'all'
        return self._send_command("StreamControl.Repeat", [self.identifier, mode])
if __name__ == "__main__":
    # Example usage
    sample_data = {
        'id': 'example_stream_id',
        'status': 'playing',
        'uri': {
            'query': {
                'name': 'Example Stream'
            },
            'path': '/example/path'
        },
        'properties': {
            'metadata': 'Example Metadata'
        },
        'meta': 'Example Meta'
    }

    host = 'localhost'
    stream = Snapstream(sample_data, host)

    # Play the stream
    play_response = stream.play()
    print("Play response:", play_response)

    # Pause the stream
    pause_response = stream.pause()
    print("Pause response:", pause_response)

    # Next track
    next_response = stream.next()
    print("Next response:", next_response)

    # Previous track
    previous_response = stream.previous()
    print("Previous response:", previous_response)

    # Enable shuffle
    shuffle_response = stream.shuffle(True)
    print("Shuffle response:", shuffle_response)

    # Set repeat mode to 'all'
    repeat_response = stream.repeat('all')
    print("Repeat response:", repeat_response)

init.py:

"""Snapcast control for Snapcast 0.11.1."""

from snapcast.control.server import Snapserver, CONTROL_PORT
from snapcast.control.stream import Snapstream  # Ensure you import the updated Snapstream class

async def create_server(loop, host, port=CONTROL_PORT, reconnect=False):
    """Server factory."""
    server = Snapserver(loop, host, port, reconnect)
    await server.start()
    return server

class ExtendedSnapserver(Snapserver):
    """Extended Snapserver with stream control."""

    def __init__(self, loop, host, port=CONTROL_PORT, reconnect=False):
        super().__init__(loop, host, port, reconnect)
        self.streams = {}

    async def start(self):
        """Start the server and initialize streams."""
        await super().start()
        # Initialize Snapstream instances for each stream
        for stream_id, stream_data in self.streams.items():
            self.streams[stream_id] = Snapstream(stream_data, self.host, self.port)

    def add_stream(self, stream_data):
        """Add a stream to the server."""
        stream_id = stream_data['id']
        self.streams[stream_id] = Snapstream(stream_data, self.host, self.port)

    def play_stream(self, stream_id):
        """Play a stream."""
        if stream_id in self.streams:
            return self.streams[stream_id].play()

    def pause_stream(self, stream_id):
        """Pause a stream."""
        if stream_id in self.streams:
            return self.streams[stream_id].pause()

    def next_stream(self, stream_id):
        """Next track in the stream."""
        if stream_id in self.streams:
            return self.streams[stream_id].next()

    def previous_stream(self, stream_id):
        """Previous track in the stream."""
        if stream_id in self.streams:
            return self.streams[stream_id].previous()

    def shuffle_stream(self, stream_id, enable=True):
        """Enable or disable shuffle for the stream."""
        if stream_id in self.streams:
            return self.streams[stream_id].shuffle(enable)

    def repeat_stream(self, stream_id, mode):
        """Set repeat mode for the stream."""
        if stream_id in self.streams:
            return self.streams[stream_id].repeat(mode)
# Example usage of ExtendedSnapserver
async def main():
    loop = asyncio.get_event_loop()
    server = ExtendedSnapserver(loop, 'localhost')
    await server.start()

    # Add a sample stream
    sample_stream_data = {
        'id': 'example_stream_id',
        'status': 'playing',
        'uri': {
            'query': {
                'name': 'Example Stream'
            },
            'path': '/example/path'
        },
        'properties': {
            'metadata': 'Example Metadata'
        },
        'meta': 'Example Meta'
    }
    server.add_stream(sample_stream_data)

    # Play the stream
    play_response = server.play_stream('example_stream_id')
    print("Play response:", play_response)

    # Pause the stream
    pause_response = server.pause_stream('example_stream_id')
    print("Pause response:", pause_response)

    # Next track
    next_response = server.next_stream('example_stream_id')
    print("Next response:", next_response)

    # Previous track
    previous_response = server.previous_stream('example_stream_id')
    print("Previous response:", previous_response)

    # Enable shuffle
    shuffle_response = server.shuffle_stream('example_stream_id', True)
    print("Shuffle response:", shuffle_response)

    # Set repeat mode to 'all'
    repeat_response = server.repeat_stream('example_stream_id', 'all')
    print("Repeat response:", repeat_response)

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

Can someone use them?