CMHopeSunshine / LittlePaimon

小派蒙!基于Nonebot2的原神机器人,包括但不限于UID面板查询、抽卡记录分析、游戏攻略图鉴、实时便签、原石札记、群聊学习、群管等功能。/ LittlePamon! Genshin Impact multifunctional bot based on Nonebot2.
https://docs.paimon.cherishmoon.top
GNU Affero General Public License v3.0
1.15k stars 126 forks source link

[建议]新增定时发送节日祝福功能 #368

Closed xiaodongchu closed 1 year ago

xiaodongchu commented 1 year ago

当前不足

没有哦(^▽^)

建议内容

xiaodongchu commented 1 year ago

参考示例[可选]

# -*- encoding: utf-8 -*-
"""
@Modify Time          @Author      @Version    @Description
--------------      -----------    --------    ------------
2023/1/19 19:15     chuxiaodong      1.0           None
"""
import datetime
import json
import os
import random
import traceback

from pathlib import Path
from nonebot import on_command, require, get_bot
from nonebot.adapters.onebot.v11 import Message, GroupMessageEvent, ActionFailed
from nonebot.params import CommandArg
from nonebot.plugin import PluginMetadata

from LittlePaimon.utils import scheduler, logger, DRIVER

require("nonebot_plugin_apscheduler")

__plugin_meta__ = PluginMetadata(
    name='定时发送消息',
    description='定时发送消息',
    usage='定时发送 消息 yyyy-mm-dd-hh-mm-ss',
    extra={
        'author': '',
        'version': '0.1',
        'priority': 6,
    }
)

add_plan = on_command('定时发送', priority=6, state={
    'pm_name': '定时发送消息',
    'pm_description': '定时发送不含空格的文本消息',
    'pm_usage': '定时发送 消息 yyyy-mm-dd-hh-mm',
    'pm_priority': 1
})
json_path = Path(__file__).parent / 'text.json'
print(json_path)
greeting_path = Path(__file__).parent / 'greeting.json'
greeting_json = {"hello_group": [], "hello_word": [""]}
my_list = []

@DRIVER.on_startup
async def _():
    global my_list, greeting_json
    if not os.path.exists(json_path):
        write_json()
    else:
        f = open(json_path, 'r')
        my_list = json.load(f)['msg']
        f.close()
    for i in my_list:
        d = i['time'].split('-')
        d = datetime.datetime(int(d[0]), int(d[1]), int(d[2]), int(d[3]), int(d[4]), 0, 0)
        scheduler.add_job(func=my_send, trigger='date', next_run_time=d, args=(i, 0))
    if not os.path.exists(greeting_path):
        f = open(greeting_path, 'w')
        json.dump(greeting_json, f)
        f.close()
    else:
        f = open(greeting_path, 'r')
        greeting_json = json.load(f)
        f.close()

@add_plan.handle()
async def _(event: GroupMessageEvent, msg: Message = CommandArg()):
    global my_list
    msg = msg.extract_plain_text().strip().split(' ')
    d = msg[-1].split('-')
    try:
        if len(d) == 2:
            d1 = datetime.datetime.now()
            d = datetime.datetime(d1.year, d1.month, d1.day, int(d[0]), int(d[1]))
        elif len(d) == 5:
            d = datetime.datetime(int(d[0]), int(d[1]), int(d[2]), int(d[3]), int(d[4]), 0, 0)
        d1 = str(f"{d.year}-{d.month}-{d.day}-{d.hour}-{d.minute}")
        a = {'msg': str(msg[-2]), 'group': int(event.group_id), 'time': d1}
        scheduler.add_job(func=my_send, trigger='date', next_run_time=d, args=(a, 0))
        my_list.append(a)
        write_json()
    except Exception as e:
        logger.warning(traceback.format_exc())
        await add_plan.finish(f"添加计划失败{str(e)}{msg}")
    await add_plan.finish(f"将在{str(d)}向群{event.group_id}发送{msg[-2]}")

@scheduler.scheduled_job("cron", hour=0, minute=0)
async def my_hello():
    global greeting_json
    for i in greeting_json['hello_group']:
        msg = random.choice(greeting_json['hello_word'])
        await get_bot().call_api("send_group_msg", group_id=i, message=msg)

async def my_send(a, i):
    global my_list
    try:
        await get_bot().call_api("send_group_msg", group_id=a['group'], message=a['msg'])
    except ActionFailed as e:
        logger.warning(f"发送群 {a['group']} 消息{a['msg']}失败:{e}{traceback.format_exc()}")
    my_list.remove(a)
    write_json()

def write_json():
    global my_list
    f = open(json_path, 'w')
    json.dump({"msg": my_list}, f)
    f.close()

IMG_0150(20230121-035755)

CMHopeSunshine commented 1 year ago

目前暂不考虑集成到本Bot中了,后续可能会以插件形式开发,感谢