Closed wwkk2580 closed 2 months ago
好的👌🏻这几天有空时我看一下
好的👌🏻这几天有空时我看一下 大佬,这是我之前自己写的,参考一下
import aiohttp import asyncio import time import json import requests from typing import Dict, Any
DINGTALK_WEBHOOK = 'https://oapi.dingtalk.com/robot/send?access_token='
last_notified_time = {}
async def get_huya_stream_status(session, anchor_id): url = f'https://www.huya.com/{anchor_id}' headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } async with session.get(url, headers=headers) as response: if response.status == 200: text = await response.text() if '"isOn":true' in text: return True else: return False else: raise Exception('Failed to fetch the Huya page')
def send_dingtalk_message(message): headers = {'Content-Type': 'application/json'} data = { "msgtype": "text", "text": { "content": message } } response = requests.post(DINGTALK_WEBHOOK, headers=headers, data=json.dumps(data)) if response.status_code != 200: raise Exception('Failed to send message to DingTalk')
async def check_and_push_status(session, anchor_id): global last_notified_time current_status = await get_huya_stream_status(session, anchor_id) current_time = time.time()
if current_status and (anchor_id not in last_notified_time or current_time - last_notified_time[anchor_id] > 3600):
message = f'主播 {anchor_id} 开播啦!快去观看!'
send_dingtalk_message(message)
last_notified_time[anchor_id] = current_time
async def main(anchor_ids): async with aiohttp.ClientSession() as session: while True: tasks = [check_and_push_status(session, anchor_id) for anchor_id in anchor_ids] await asyncio.gather(*tasks) await asyncio.sleep(60)
if name == 'main': anchor_ids = ['934372', '28811'] # 替换为你的多个主播ID asyncio.run(main(anchor_ids))
已实现&构建完毕,可以拉镜像试试看🎉
增加虎牙、斗鱼直播