import os
import re
import sys
import json
import time
import requests
from lxml import etree
from loguru import logger
from urllib.parse import unquote
import socket
import socks
from multiprocessing.connection import wait
import multiprocessing
def byte2size(value):
value = int(value)
units = ["B", "KB", "MB", "GB", "TB", "PB"]
size = 1024.0
for i in range(len(units)):
if (value/size) < 1:
return "%.2f%s" % (value, units[i])
value = value/size
import os import re import sys import json import time import requests from lxml import etree from loguru import logger from urllib.parse import unquote import socket
import socks
from multiprocessing.connection import wait import multiprocessing
强制取消警告
from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) socks.set_default_proxy(socks.SOCKS5, "localhost", 10800)
socket.socket = socks.socksocket
import threading
======================= CONFIG ===================
Version
VERSION = "V1.0.5"
如果需要下载private视频,则将自己的cookie填入此处
cookie = ""
设置为True 开启DEBUG模式
DEBUG = False
自定义下载路径
ROOT_DIR = r""
thread num
THREAD_NUM = 1
log config
if DEBUG: level = "DEBUG" else: level = "INFO"
remove default handler
logger.remove()
控制台输出
logger.add( sys.stderr, level=level ) log_path = os.path.join(os.path.dirname(os.path.abspath(file)), "log")
日志写入
logger.add( os.path.join(log_path, "{time}.log"), encoding="utf-8", rotation="00:00", enqueue=True, level=level ) logger.add( os.path.join(log_path, "[ERROR video] {time}.log"), encoding="utf-8", rotation="00:00", enqueue=True, level="ERROR" )
======================= CONFIG ===================
重试视频列表 - 视频完整性校验失败
[{"args":args, "count":count},{}...]
RETRY_COUNT = 3 ERR_TASK_LIST = []
def check_video(resp, path)->bool: """ :params resp: 响应体 :params path: 本地视频 """ local_size = int(os.path.getsize(path)) resp_size = int(resp.headers["content-length"]) logger.debug(f"{local_size} {resp_size}") if local_size != resp_size: return False else: return True
def byte2size(value): value = int(value) units = ["B", "KB", "MB", "GB", "TB", "PB"] size = 1024.0 for i in range(len(units)): if (value/size) < 1: return "%.2f%s" % (value, units[i]) value = value/size
class IwaraDownloader: """ 目前支持 1、iwara用户的所有视频下载,存储在单个的<用户名称>文件夹中 2、单个iwara视频下载,存储在down文件夹下 iwara地址:https://ecchi.iwara.tv/videos/v2km9s5npeterzlad 对应api接口为:https://ecchi.iwara.tv/api/video/v2km9s5npeterzlad """ def init(self,links): self.iwara_host = "https://ecchi.iwara.tv"
if name == 'main': logger.success(f"=== 欢迎使用< iwara下载脚本 {VERSION} > ===") logger.success(f"Github地址: https://github.com/WriteCode-ChangeWorld/Tools 欢迎Star~")