Casvt / Plex-scripts

Plex, the arr's and tautulli scripts coming from user requests
GNU General Public License v3.0
339 stars 30 forks source link

playlist_to_collection.py: Copy poster from playlist to collection #212

Closed AverageDave93 closed 3 weeks ago

AverageDave93 commented 4 months ago

In what file should the feature be added? playlist_to_collection.py

What is the feature request? copying the poster from the playlist to the collection.

AverageDave93 commented 3 months ago

I have change the playlist_to_collection.py to include the copying of a playlist poster to the collection. you just need to install plexapi just as you did requests.

#!/usr/bin/python3
# -*- coding: utf-8 -*-

"""
The use case of this script is the following:
    Get the content of a playlist and put it in a collection
Requirements (python3 -m pip install [requirement]):
    requests
    plexapi
Setup:
    Fill the variables below firstly, then run the script with -h to see the arguments that you need to give.
"""

import requests
import plexapi
from plexapi.server import PlexServer
import argparse
import logging
from os import getenv

# Configuration
plex_ip = getenv('plex_ip', '')
plex_port = getenv('plex_port', '')
plex_api_token = getenv('plex_api_token', '')
base_url = f"http://{plex_ip}:{plex_port}"

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Modify the playlist_to_collection() function to return playlist_name
def playlist_to_collection(ssn, library_name: str, playlist_name: str, remove_playlist: bool=False):
    try:
        # Fetch playlists
        logger.info("Fetching playlists...")
        playlists_response = ssn.get(f'{base_url}/playlists')
        playlists_response.raise_for_status()
        playlists = playlists_response.json()['MediaContainer']

        if 'Metadata' not in playlists:
            return 'Playlist not found', None

        # Find the target playlist
        playlist_entries = []
        for playlist in playlists['Metadata']:
            if playlist['title'] == playlist_name:
                logger.info(f"Found playlist '{playlist_name}' with ID {playlist['ratingKey']}")
                playlist_id = playlist['ratingKey']
                playlist_entries_response = ssn.get(f'{base_url}/playlists/{playlist_id}/items')
                playlist_entries_response.raise_for_status()
                playlist_entries = playlist_entries_response.json()['MediaContainer']
                if 'Metadata' not in playlist_entries:
                    return 'Playlist is empty', None
                playlist_entries = [entry['ratingKey'] for entry in playlist_entries['Metadata']]
                collection_thumb_url = playlist['thumb']
                break
        else:
            return 'Playlist not found', None

        # Fetch library sections
        logger.info("Fetching library sections...")
        sections_response = ssn.get(f'{base_url}/library/sections')
        sections_response.raise_for_status()
        sections = sections_response.json()['MediaContainer']['Directory']

        # Find the target library
        for lib in sections:
            if lib['title'] == library_name:
                lib_id = lib['key']
                lib_type = lib['type']
                break
        else:
            return 'Library not found', None

        # Map library type to Plex type
        lib_type_map = {'movie': '1', 'show': '4', 'artist': '10', 'photo': '13'}
        if lib_type not in lib_type_map:
            return 'Library type not supported', None
        lib_type = lib_type_map[lib_type]

        # Remove existing collection with the same name
        logger.info(f"Checking for existing collections named '{playlist_name}'...")
        collections_response = ssn.get(f'{base_url}/library/sections/{lib_id}/collections')
        collections_response.raise_for_status()
        collections = collections_response.json()['MediaContainer']
        if 'Metadata' in collections:
            for collection in collections['Metadata']:
                if collection['title'] == playlist_name:
                    logger.info(f"Deleting existing collection '{playlist_name}' with ID {collection['ratingKey']}")
                    delete_collection_response = ssn.delete(f'{base_url}/library/collections/{collection["ratingKey"]}')
                    delete_collection_response.raise_for_status()
                    break

        # Create new collection
        logger.info(f"Creating new collection '{playlist_name}'...")
        machine_id_response = ssn.get(f'{base_url}/')
        machine_id_response.raise_for_status()
        machine_id = machine_id_response.json()['MediaContainer']['machineIdentifier']
        ssn.post(f'{base_url}/library/collections', params={
            'type': lib_type,
            'title': playlist_name,
            'smart': '0',
            'sectionId': lib_id,
            'uri': f'server://{machine_id}/com.plexapp.plugins.library/library/metadata/{",".join(playlist_entries)}'
        })

        # Remove playlist if specified
        if remove_playlist:
            logger.info(f"Deleting playlist '{playlist_name}' with ID {playlist_id}")
            delete_playlist_response = ssn.delete(f'{base_url}/playlists/{playlist_id}')
            if delete_playlist_response.status_code == 200:
                logger.info(f"Successfully deleted playlist '{playlist_name}'")
            else:
                logger.error(f"Failed to delete playlist '{playlist_name}'. Status code: {delete_playlist_response.status_code}")

        return 'Operation completed successfully', collection_thumb_url
    except requests.RequestException as e:
        logger.error(f'HTTP Request failed: {e}')
        return 'HTTP Request failed', None
    except Exception as e:
        logger.error(f'An error occurred: {e}')
        return 'An error occurred', None

if __name__ == '__main__':
    # Setup session
    ssn = requests.Session()
    ssn.headers.update({'Accept': 'application/json'})
    ssn.params.update({'X-Plex-Token': plex_api_token})

    # Setup argument parsing
    parser = argparse.ArgumentParser(description='Get the content of a playlist and put it in a collection')
    parser.add_argument('-l', '--LibraryName', type=str, help='Name of target library', required=True)
    parser.add_argument('-p', '--PlaylistName', type=str, help='Name of target playlist', required=True)
    parser.add_argument('-r', '--RemovePlaylist', help='Remove source playlist afterwards', action='store_true')

    args = parser.parse_args()

    # Call function and process result
    response, collection_thumb_url = playlist_to_collection(ssn=ssn, library_name=args.LibraryName, playlist_name=args.PlaylistName, remove_playlist=args.RemovePlaylist)
    if response != 'Operation completed successfully':
        parser.error(response)
    else:
        logger.info(response)

    # Collection information
    COLLECTION_TITLE = args.PlaylistName
    POSTER_URL = f"http://{plex_ip}:{plex_port}{collection_thumb_url}?X-Plex-Token={plex_api_token}"

    # Connect to Plex server
    plex = PlexServer(base_url, plex_api_token)

    # Retrieve the collection
    collection = plex.library.section(args.LibraryName).collection(COLLECTION_TITLE)

    # Set the poster image for the collection
    collection.uploadPoster(POSTER_URL)

    print(f"Poster set successfully for collection: {COLLECTION_TITLE}")
AverageDave93 commented 3 months ago

playlist_to_collection_including_poster.zip

AverageDave93 commented 3 months ago

not sure if anone is even using/wanting this lol bu there was an issue with the scrpt where if it didnt have a poster set then it wouldnt copy it across, below is the new script that will work.

#!/usr/bin/python3
# -*- coding: utf-8 -*-

"""
The use case of this script is the following:
    Get the content of a playlist and put it in a collection
Requirements (python3 -m pip install [requirement]):
    requests
    plexapi
Setup:
    Fill the variables below firstly, then run the script with -h to see the arguments that you need to give.
"""

import requests
from os import getenv
import argparse
import logging
from plexapi.server import PlexServer

# Configuration
plex_ip = getenv('plex_ip', '')
plex_port = getenv('plex_port', '')
plex_api_token = getenv('plex_api_token', '')
base_url = f"http://{plex_ip}:{plex_port}"

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Modify the playlist_to_collection() function to return playlist_name
def playlist_to_collection(ssn, library_name: str, playlist_name: str, remove_playlist: bool=False):
    try:
        # Fetch playlists
        logger.info("Fetching playlists...")
        playlists_response = ssn.get(f'{base_url}/playlists')
        playlists_response.raise_for_status()
        playlists = playlists_response.json()['MediaContainer']

        if 'Metadata' not in playlists:
            return 'Playlist not found', None

        # Find the target playlist
        playlist_entries = []
        for playlist in playlists['Metadata']:
            if playlist['title'] == playlist_name:
                logger.info(f"Found playlist '{playlist_name}' with ID {playlist['ratingKey']}")
                playlist_id = playlist['ratingKey']
                playlist_entries_response = ssn.get(f'{base_url}/playlists/{playlist_id}/items')
                playlist_entries_response.raise_for_status()
                playlist_entries = playlist_entries_response.json()['MediaContainer']
                if 'Metadata' not in playlist_entries:
                    return 'Playlist is empty', None
                playlist_entries = [entry['ratingKey'] for entry in playlist_entries['Metadata']]
                collection_thumb_url = playlist.get('thumb', '')  # Safely retrieve thumb attribute
                break
        else:
            return 'Playlist not found', None

        # Fetch library sections
        logger.info("Fetching library sections...")
        sections_response = ssn.get(f'{base_url}/library/sections')
        sections_response.raise_for_status()
        sections = sections_response.json()['MediaContainer']['Directory']

        # Find the target library
        for lib in sections:
            if lib['title'] == library_name:
                lib_id = lib['key']
                lib_type = lib['type']
                break
        else:
            return 'Library not found', None

        # Map library type to Plex type
        lib_type_map = {'movie': '1', 'show': '4', 'artist': '10', 'photo': '13'}
        if lib_type not in lib_type_map:
            return 'Library type not supported', None
        lib_type = lib_type_map[lib_type]

        # Remove existing collection with the same name
        logger.info(f"Checking for existing collections named '{playlist_name}'...")
        collections_response = ssn.get(f'{base_url}/library/sections/{lib_id}/collections')
        collections_response.raise_for_status()
        collections = collections_response.json()['MediaContainer']
        if 'Metadata' in collections:
            for collection in collections['Metadata']:
                if collection['title'] == playlist_name:
                    logger.info(f"Deleting existing collection '{playlist_name}' with ID {collection['ratingKey']}")
                    delete_collection_response = ssn.delete(f'{base_url}/library/collections/{collection["ratingKey"]}')
                    delete_collection_response.raise_for_status()
                    break

        # Create new collection
        logger.info(f"Creating new collection '{playlist_name}'...")
        machine_id_response = ssn.get(f'{base_url}/')
        machine_id_response.raise_for_status()
        machine_id = machine_id_response.json()['MediaContainer']['machineIdentifier']
        ssn.post(f'{base_url}/library/collections', params={
            'type': lib_type,
            'title': playlist_name,
            'smart': '0',
            'sectionId': lib_id,
            'uri': f'server://{machine_id}/com.plexapp.plugins.library/library/metadata/{",".join(playlist_entries)}'
        })

        # Remove playlist if specified
        if remove_playlist:
            logger.info(f"Deleting playlist '{playlist_name}' with ID {playlist_id}")
            delete_playlist_response = ssn.delete(f'{base_url}/playlists/{playlist_id}')
            if delete_playlist_response.status_code == 200:
                logger.info(f"Successfully deleted playlist '{playlist_name}'")
            else:
                logger.error(f"Failed to delete playlist '{playlist_name}'. Status code: {delete_playlist_response.status_code}")

        return 'Operation completed successfully', collection_thumb_url
    except requests.RequestException as e:
        logger.error(f'HTTP Request failed: {e}')
        return 'HTTP Request failed', None
    except Exception as e:
        logger.error(f'An error occurred: {e}')
        return 'An error occurred', None

if __name__ == '__main__':
    # Setup session
    ssn = requests.Session()
    ssn.headers.update({'Accept': 'application/json'})
    ssn.params.update({'X-Plex-Token': plex_api_token})

    # Setup argument parsing
    parser = argparse.ArgumentParser(description='Get the content of a playlist and put it in a collection')
    parser.add_argument('-l', '--LibraryName', type=str, help='Name of target library', required=True)
    parser.add_argument('-p', '--PlaylistName', type=str, help='Name of target playlist', required=True)
    parser.add_argument('-r', '--RemovePlaylist', help='Remove source playlist afterwards', action='store_true')

    args = parser.parse_args()

    # Call function and process result
    response, collection_thumb_url = playlist_to_collection(ssn=ssn, library_name=args.LibraryName, playlist_name=args.PlaylistName, remove_playlist=args.RemovePlaylist)
    if response != 'Operation completed successfully':
        parser.error(response)
    else:
        logger.info(response)

    # Collection information
    COLLECTION_TITLE = args.PlaylistName

    # Check if collection_thumb_url is not empty
    if collection_thumb_url:
        POSTER_URL = f"http://{plex_ip}:{plex_port}{collection_thumb_url}?X-Plex-Token={plex_api_token}"

        # Connect to Plex server
        plex = PlexServer(base_url, plex_api_token)

        # Retrieve the collection
        collection = plex.library.section(args.LibraryName).collection(COLLECTION_TITLE)

        # Set the poster image for the collection
        collection.uploadPoster(POSTER_URL)

        print(f"Poster set successfully for collection: {COLLECTION_TITLE}")
    else:
        print(f"No poster found for collection: {COLLECTION_TITLE}. Poster not set.")