Closed NevermoreN closed 1 month ago
甚至略微有些尴尬
这个需要emby的report插件处理,他数据库不存媒体库名,只有文件名,根据文件反推媒体库太麻烦了
加个v吧大佬 这是emby插件的查询示例是否可以用呢 15180119962 SELECT date(DateCreated) AS Date, SUM(PlayDuration) AS PlayTime FROM PlaybackActivity WHERE ItemType = 'Movie' GROUP BY date(DateCreated) ORDER BY date(DateCreated) ASC
加个v吧大佬 这是emby插件的查询示例是否可以用呢 15180119962 SELECT date(DateCreated) AS Date, SUM(PlayDuration) AS PlayTime FROM PlaybackActivity WHERE ItemType = 'Movie' GROUP BY date(DateCreated) ORDER BY date(DateCreated) ASC
emby插件的播放记录就没有存储这条播放记录所在的媒体库名 处理不了
要不加个v讨论一下呗
我刚才改了一点但是没起作用 import os
from app.core.config import settings from app.plugins import _PluginBase from typing import Any, List, Dict, Tuple, Optional from app.log import logger from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger
from app.schemas import NotificationType from pathlib import Path
import random from io import BytesIO from PIL import Image from PIL import ImageFont from PIL import ImageDraw import pytz from cacheout import Cache from datetime import datetime, timedelta
from app.utils.http import RequestUtils from app.utils.string import StringUtils
cache = Cache()
class EmbyReporter(_PluginBase):
plugin_name = "Emby观影报告"
# 插件描述
plugin_desc = "推送Emby观影报告,需Emby安装Playback Report 插件。"
# 插件图标
plugin_icon = "Pydiocells_A.png"
# 插件版本
plugin_version = "1.8"
# 插件作者
plugin_author = "thsrite"
# 作者主页
author_url = "https://github.com/thsrite"
# 插件配置项ID前缀
plugin_config_prefix = "embyreporter_"
# 加载顺序
plugin_order = 30
# 可使用的用户级别
auth_level = 1
# 私有属性
_enabled: bool = False
_onlyonce: bool = False
_res_dir = None
_cron = None
_days = None
_type = None
_cnt = None
_mp_host = None
_emby_host = None
_emby_api_key = None
show_time = True
_scheduler: Optional[BackgroundScheduler] = None
PLAYBACK_REPORTING_TYPE_MOVIE = "ItemName"
PLAYBACK_REPORTING_TYPE_TVSHOWS = "substr(ItemName,0, instr(ItemName, ' - '))"
host = None
api_key = None
def init_plugin(self, config: dict = None):
self.host = f"http://{settings.EMBY_HOST}" if not str(settings.EMBY_HOST).startswith(
"http") else settings.EMBY_HOST
self.api_key = settings.EMBY_API_KEY
# 停止现有任务
self.stop_service()
if config:
self._enabled = config.get("enabled")
self._onlyonce = config.get("onlyonce")
self._cron = config.get("cron")
self._res_dir = config.get("res_dir")
self._days = config.get("days") or 7
self._cnt = config.get("cnt") or 10
self._type = config.get("type") or "tg"
self._mp_host = config.get("mp_host")
self.show_time = config.get("show_time")
self._emby_host = config.get("emby_host")
self._emby_api_key = config.get("emby_api_key")
library_ids = config.get("library_ids", "")
self.library_ids_list = [lib.strip() for lib in library_ids.split(',')] if library_ids else None
if self._emby_host and self._emby_api_key:
self.host = f"http://{self._emby_host}" if not str(self._emby_host).startswith(
"http") else self._emby_host
self.api_key = self._emby_api_key
if self._enabled or self._onlyonce:
# 定时服务
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
# 立即运行一次
if self._onlyonce:
logger.info(f"Emby观影报告服务启动,立即运行一次")
self._scheduler.add_job(self.__report, 'date',
run_date=datetime.now(
tz=pytz.timezone(settings.TZ)) + timedelta(seconds=3),
name="Emby观影报告")
# 关闭一次性开关
self._onlyonce = False
# 保存配置
self.__update_config()
# 周期运行
if self._cron:
try:
self._scheduler.add_job(func=self.__report,
trigger=CronTrigger.from_crontab(self._cron),
name="Emby观影报告")
except Exception as err:
logger.error(f"定时任务配置错误:{err}")
# 推送实时消息
self.systemmessage.put(f"执行周期配置错误:{err}")
# 启动任务
if self._scheduler.get_jobs():
self._scheduler.print_jobs()
self._scheduler.start()
def __report(self):
"""
发送Emby观影报告
"""
# 本地路径转为url
if not self._mp_host:
return
if not self._type:
return
# 获取当前时间并格式化
current_time = datetime.now().strftime("%Y%m%d%H%M%S")
# 获取数据
success, movies = self.get_report(types=self.PLAYBACK_REPORTING_TYPE_MOVIE, days=int(self._days),
limit=int(self._cnt), library_ids=self.library_ids_list)
if not success:
exit(movies)
logger.info(f"获取到电影 {movies}")
success, tvshows = self.get_report(types=self.PLAYBACK_REPORTING_TYPE_TVSHOWS, days=int(self._days),
limit=int(self._cnt), library_ids=self.library_ids_list)
if not success:
exit(tvshows)
logger.info(f"获取到电视剧 {tvshows}")
# 绘制海报
report_path = self.draw(res_path=self._res_dir,
movies=movies,
tvshows=tvshows,
show_time=self.show_time)
if not report_path:
logger.error("生成海报失败")
return
# 示例调用
self.__split_image_by_height(report_path, "/public/report", [250, 330, 335])
# 分块推送
for i in range(2, 4):
report_path_part = f"/public/report_part_{i}.jpg"
report_url = self._mp_host + report_path_part.replace("/public", "") + f"?_timestamp={current_time}"
mtype = NotificationType.MediaServer
if self._type:
mtype = NotificationType.__getitem__(str(self._type)) or NotificationType.MediaServer
self.post_message(
title=f'Movies 近{self._days}日观影排行' if i == 2 else f'TV Shows 近{self._days}日观影排行',
mtype=mtype,
image=report_url)
logger.info(f"Emby观影记录推送成功 {report_url}")
@staticmethod
def __split_image_by_height(image_path, output_path_prefix, heights):
# 打开原始图像
img = Image.open(image_path)
img_width, img_height = img.size
# 如果图像是 RGBA 模式,转换为 RGB 模式
if img.mode == 'RGBA':
img = img.convert('RGB')
# 分割图像的起始位置
top = 0
# 按指定高度分割图像
for i, height in enumerate(heights):
# 确保不会超出图像边界
if top + height > img_height:
height = img_height - top
bottom = top + height
# 裁剪图像
box = (0, top, img_width, bottom)
part = img.crop(box)
# 保存图像部分
part.save(f"{output_path_prefix}_part_{i + 1}.jpg")
# 更新下一个部分的上边界
top = bottom
# 如果已经到达图像底部,停止
if top >= img_height:
break
print("图片按照指定高度分割完成!")
def __update_config(self):
self.update_config({
"enabled": self._enabled,
"onlyonce": self._onlyonce,
"cron": self._cron,
"days": self._days,
"cnt": self._cnt,
"type": self._type,
"mp_host": self._mp_host,
"show_time": self.show_time,
"emby_host": self._emby_host,
"emby_api_key": self._emby_api_key,
"res_dir": self._res_dir,
"library_ids": ','.join(self.library_ids_list) if self.library_ids_list else ""
})
def get_state(self) -> bool:
return self._enabled
@staticmethod
def get_command() -> List[Dict[str, Any]]:
pass
def get_api(self) -> List[Dict[str, Any]]:
pass
def get_form(self) -> Tuple[List[dict], Dict[str, Any]]:
"""
拼装插件配置页面,需要返回两块数据:1、页面配置;2、数据结构
"""
MsgTypeOptions = []
for item in NotificationType:
MsgTypeOptions.append({
"title": item.value,
"value": item.name
})
# 编历 NotificationType 枚举,生成消息类型选项
return [
{
'component': 'VForm',
'content': [
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'enabled',
'label': '启用插件',
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSwitch',
'props': {
'model': 'onlyonce',
'label': '立即运行一次',
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'cron',
'label': '执行周期',
'placeholder': '5位cron表达式,留空自动'
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'res_dir',
'label': '素材路径',
'placeholder': '本地素材路径'
}
}
]
},
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'days',
'label': '报告天数',
'placeholder': '向前获取数据的天数'
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'cnt',
'label': '观影记录数量',
'placeholder': '获取观影数据数量,默认10'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'mp_host',
'label': 'MoviePilot域名',
'placeholder': '必填,末尾不带/'
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSelect',
'props': {
'multiple': False,
'chips': True,
'model': 'type',
'label': '推送方式',
'items': MsgTypeOptions
}
}
]
},
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VSelect',
'props': {
'model': 'show_time',
'label': '是否显示观看时长',
'items': [
{'title': '是', 'value': True},
{'title': '否', 'value': False}
]
}
}
]
},
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'emby_host',
'label': '自定义emby host',
'placeholder': 'IP:PORT'
}
}
]
},
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'emby_api_key',
'label': '自定义emby apiKey'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
'md': 6
},
'content': [
{
'component': 'VTextField',
'props': {
'model': 'library_ids',
'label': '影音库 ID 列表',
'placeholder': '用逗号分隔的库 ID'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '如生成观影报告有空白记录,可酌情调大观影记录数量。'
}
}
]
}
]
},
{
'component': 'VRow',
'content': [
{
'component': 'VCol',
'props': {
'cols': 12,
},
'content': [
{
'component': 'VAlert',
'props': {
'type': 'info',
'variant': 'tonal',
'text': '如未设置自定义emby配置,则读取环境变量emby配置。'
}
}
]
}
]
}
]
}
], {
"enabled": False,
"onlyonce": False,
"cron": "5 1 * * *",
"res_dir": "",
"days": 7,
"cnt": 10,
"emby_host": "",
"emby_api_key": "",
"mp_host": "",
"show_time": True,
"type": "",
"library_ids": "" # 新增字段
}
def get_page(self) -> List[dict]:
pass
def stop_service(self):
"""
退出插件
"""
try:
if self._scheduler:
self._scheduler.remove_all_jobs()
if self._scheduler.running:
self._scheduler.shutdown()
self._scheduler = None
except Exception as e:
logger.error("退出插件失败:%s" % str(e))
def draw(self, res_path, movies, tvshows, show_time=True):
# 默认路径 默认图
if not res_path:
res_path = os.path.join(Path(__file__).parent, "res")
# 绘图文件路径初始化
bg_path = os.path.join(res_path, "bg")
mask_path = os.path.join(res_path, "cover-ranks-mask-2.png")
font_path = os.path.join(res_path, "PingFang Bold.ttf")
# 随机调取背景, 路径: res/ranks/bg/...
bg_list = os.listdir(bg_path)
bg_path = os.path.join(bg_path, bg_list[random.randint(0, len(bg_list) - 1)])
# 初始绘图对象
bg = Image.open(bg_path)
mask = Image.open(mask_path)
bg.paste(mask, (0, 0), mask)
font = ImageFont.truetype(font_path, 18)
font_small = ImageFont.truetype(font_path, 14)
font_count = ImageFont.truetype(font_path, 8)
exists_movies = []
for i in movies:
try:
# 榜单项数据
user_id, item_id, item_type, name, count, duration = tuple(i)
print(item_type, item_id, name, count, StringUtils.str_secends(int(duration)))
# 封面图像获取
success, data = self.primary(item_id)
if not success:
continue
exists_movies.append(i)
except Exception:
continue
logger.info(f"过滤后未删除电影 {len(exists_movies)} 部")
# 合并绘制
if len(exists_movies) < 5:
for i in range(5 - len(exists_movies) + 1):
exists_movies.append({"item_id": i})
if len(exists_movies) > 5:
exists_movies = exists_movies[:5]
exists
你们加个v 讨论一下吧 @thsrite
是啊 加个v吧 现在那些哥们儿尬片冲榜 没办法啊 又不想给家人看到 头大
你可以自己改改看看,根据emby report插件的itemid反推获取媒体库id
加个v吧大佬
[Emby观影报告 v2.0 的库选择问题: 我有一些库吧 不是很想加入排行 具体原因你懂的 能否加入一个选库功能呢 大哥 因为那个吧 我有家人会看到 确实不太好
插件v2.1已支持,需要更新MoviePilot 2.0
[Emby观影报告 v2.0 的库选择问题: 我有一些库吧 不是很想加入排行 具体原因你懂的 能否加入一个选库功能呢 大哥 因为那个吧 我有家人会看到 确实不太好
插件v2.1已支持,需要更新MoviePilot 2.0
解决了那个问题吗
[Emby观影报告 v2.0 的库选择问题: 我有一些库吧 不是很想加入排行 具体原因你懂的 能否加入一个选库功能呢 大哥 因为那个吧 我有家人会看到 确实不太好