beaston02 / CAM4Recorder

52 stars 75 forks source link

Updated the script #14

Closed ahsand97 closed 3 years ago

ahsand97 commented 3 years ago

Updated the script to include garbace collector, multithreaded list, threaded classes and moreover control, also ported livestreamer to streamlink since livestreamer is not being updated anymore, there's still a bug that I couldn't find how to solve it related with duplicated files, sometimes a Thread write 2 files at the same time.

In your script there was a memory leak if you had it running for hours, I tried to optimize as much as I could with the del statements and having control over the python's Garbage Collector and creation/stopping/deletion of Threads on every check.

ahsand97 commented 3 years ago

Also if models are duplicated in the wanted list, the Thread is only created once per model and the script prints the duplicated model. If a model that's being recorded gets deleted in the wanted list, the thread stops and gets removed.

senhor-R commented 3 years ago

@ahsand97 help me, dont work.

senhor-R commented 3 years ago

@ahsand97 cam4 updated site

ahsand97 commented 3 years ago

@ahsand97 cam4 updated site

yes, cam4 updated their site, this is the updated one.

Click to expand! ```python import time import datetime import os import threading import sys import configparser import subprocess import queue import requests import streamlink if os.name == 'nt': import ctypes kernel32 = ctypes.windll.kernel32 kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7) mainDir = sys.path[0] Config = configparser.ConfigParser() setting = {} recording = [] hilos = [] def cls(): os.system('cls' if os.name == 'nt' else 'clear') def readConfig(): global setting Config.read(mainDir + '/config.conf') setting = { 'save_directory': Config.get('paths', 'save_directory'), 'wishlist': Config.get('paths', 'wishlist'), 'interval': int(Config.get('settings', 'checkInterval')), 'postProcessingCommand': Config.get('settings', 'postProcessingCommand'), } try: setting['postProcessingThreads'] = int(Config.get('settings', 'postProcessingThreads')) except ValueError: if setting['postProcessingCommand'] and not setting['postProcessingThreads']: setting['postProcessingThreads'] = 1 if not os.path.exists(f'{setting["save_directory"]}'): os.makedirs(f'{setting["save_directory"]}') def postProcess(): while True: while processingQueue.empty(): time.sleep(1) parameters = processingQueue.get() model = parameters['model'] path = parameters['path'] filename = os.path.split(path)[-1] directory = os.path.dirname(path) file = os.path.splitext(filename)[0] subprocess.call(setting['postProcessingCommand'].split() + [path, filename, directory, model, file, 'cam4']) class Modelo(threading.Thread): def __init__(self, modelo): super().__init__() self.modelo = modelo self._stopevent = threading.Event() self.file = None self.online = None self.lock = threading.Lock() def run(self): global recording, hilos isOnline = self.isOnline() if isOnline == False: self.online = False else: self.online = True self.file = os.path.join(setting['save_directory'], self.modelo, f'{datetime.datetime.fromtimestamp(time.time()).strftime("%Y.%m.%d_%H.%M.%S")}_{self.modelo}.mp4') try: session = streamlink.Streamlink() streams = session.streams(f'hlsvariant://{isOnline}') stream = streams['best'] fd = stream.open() if not isModelInListofObjects(self.modelo, recording): os.makedirs(os.path.join(setting['save_directory'], self.modelo), exist_ok=True) with open(self.file, 'wb') as f: self.lock.acquire() recording.append(self) for index, hilo in enumerate(hilos): if hilo.modelo == self.modelo: del hilos[index] break self.lock.release() while not (self._stopevent.isSet() or os.fstat(f.fileno()).st_nlink == 0): try: data = fd.read(1024) f.write(data) except: fd.close() break if setting['postProcessingCommand']: processingQueue.put({'model': self.modelo, 'path': self.file}) except Exception as e: with open('log.log', 'a+') as f: f.write(f'\n{datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")} EXCEPTION: {e}\n') self.stop() finally: self.exceptionHandler() def exceptionHandler(self): self.stop() self.online = False self.lock.acquire() for index, hilo in enumerate(recording): if hilo.modelo == self.modelo: del recording[index] break self.lock.release() try: file = os.path.join(os.getcwd(), self.file) if os.path.isfile(file): if os.path.getsize(file) <= 1024: os.remove(file) except Exception as e: with open('log.log', 'a+') as f: f.write(f'\n{datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")} EXCEPTION: {e}\n') def isOnline(self): try: resp = requests.get(f'https://www.cam4.com/rest/v1.0/profile/{self.modelo}/streamInfo') hls_url = '' if 'cdnURL' in resp.json(): hls_url = resp.json()['cdnURL'] if len(hls_url): return hls_url else: return False except: return False def stop(self): self._stopevent.set() class CleaningThread(threading.Thread): def __init__(self): super().__init__() self.interval = 0 self.lock = threading.Lock() def run(self): global hilos, recording while True: self.lock.acquire() new_hilos = [] for hilo in hilos: if hilo.is_alive() or hilo.online: new_hilos.append(hilo) hilos = new_hilos self.lock.release() for i in range(10, 0, -1): self.interval = i time.sleep(1) class AddModelsThread(threading.Thread): def __init__(self): super().__init__() self.wanted = [] self.lock = threading.Lock() self.repeatedModels = [] self.counterModel = 0 def run(self): global hilos, recording lines = open(setting['wishlist'], 'r').read().splitlines() self.wanted = (x for x in lines if x) self.lock.acquire() aux = [] for model in self.wanted: model = model.lower() if model in aux: self.repeatedModels.append(model) else: aux.append(model) self.counterModel = self.counterModel + 1 if not isModelInListofObjects(model, hilos) and not isModelInListofObjects(model, recording): thread = Modelo(model) thread.start() hilos.append(thread) for hilo in recording: if hilo.modelo not in aux: hilo.stop() self.lock.release() def isModelInListofObjects(obj, lista): result = False for i in lista: if i.modelo == obj: result = True break return result if __name__ == '__main__': readConfig() if setting['postProcessingCommand']: processingQueue = queue.Queue() postprocessingWorkers = [] for i in range(0, setting['postProcessingThreads']): t = threading.Thread(target=postProcess) postprocessingWorkers.append(t) t.start() cleaningThread = CleaningThread() cleaningThread.start() while True: try: readConfig() addModelsThread = AddModelsThread() addModelsThread.start() i = 1 for i in range(setting['interval'], 0, -1): cls() if len(addModelsThread.repeatedModels): print('The following models are more than once in wanted: [\'' + ', '.join(modelo for modelo in addModelsThread.repeatedModels) + '\']') print(f'{len(hilos):02d} alive Threads (1 Thread per non-recording model), cleaning dead/not-online Threads in {cleaningThread.interval:02d} seconds, {addModelsThread.counterModel:02d} models in wanted') print(f'Online Threads (models): {len(recording):02d}') print('The following models are being recorded:') for hiloModelo in recording: print(f' Model: {hiloModelo.modelo} --> File: {os.path.basename(hiloModelo.file)}') print(f'Next check in {i:02d} seconds\r', end='') time.sleep(1) addModelsThread.join() del addModelsThread, i except: break ```
senhor-R commented 3 years ago

God

senhor-R commented 3 years ago

please teach me how you made the script work or tell me what to study

ahsand97 commented 3 years ago

please teach me how you made the script work or tell me what to study

I updated the script in the past to work with multithreading and since cam4 changed their site, the urls changed too, I just took the new one to keep it working, remember that in order to make the script work, there's need to be a conf file just like the original script, also this new version creates a file log.log which contaings the logs when exceptions happens

senhor-R commented 3 years ago

how did you find the new urls ?

senhor-R commented 3 years ago

where do i see?

ahsand97 commented 3 years ago

how did you find the new urls ?

Inspecting the web page watching the requests it made whenever u open a model's cam with dev tools available in every browser

senhor-R commented 3 years ago

thanks

thelittlerocket commented 3 years ago

@ahsand97 cam4 updated site

yes, cam4 updated their site, this is the updated one.

import time
import datetime
import os
import threading
import sys
import configparser
import livestreamer
import subprocess
import queue
import requests

if os.name == 'nt':
    import ctypes
    kernel32 = ctypes.windll.kernel32
    kernel32.SetConsoleMode(kernel32.GetStdHandle(-11), 7)

mainDir = sys.path[0]
Config = configparser.ConfigParser()
setting = {}

recording = []

hilos = []

def cls():
    os.system('cls' if os.name == 'nt' else 'clear')

def readConfig():
    global setting

    Config.read(mainDir + '/config.conf')
    setting = {
        'save_directory': Config.get('paths', 'save_directory'),
        'wishlist': Config.get('paths', 'wishlist'),
        'interval': int(Config.get('settings', 'checkInterval')),
        'postProcessingCommand': Config.get('settings', 'postProcessingCommand'),
        }
    try:
        setting['postProcessingThreads'] = int(Config.get('settings', 'postProcessingThreads'))
    except ValueError:
        if setting['postProcessingCommand'] and not setting['postProcessingThreads']:
            setting['postProcessingThreads'] = 1

    if not os.path.exists(f'{setting["save_directory"]}'):
        os.makedirs(f'{setting["save_directory"]}')

def postProcess():
    while True:
        while processingQueue.empty():
            time.sleep(1)
        parameters = processingQueue.get()
        model = parameters['model']
        path = parameters['path']
        filename = os.path.split(path)[-1]
        directory = os.path.dirname(path)
        file = os.path.splitext(filename)[0]
        subprocess.call(setting['postProcessingCommand'].split() + [path, filename, directory, model,  file, 'cam4'])

class Modelo(threading.Thread):
    def __init__(self, modelo):
        super().__init__()
        self.modelo = modelo
        self._stopevent = threading.Event()
        self.file = None
        self.online = None
        self.lock = threading.Lock()

    def run(self):
        global recording, hilos
        isOnline = self.isOnline()
        if isOnline == False:
            self.online = False
        else:
            self.online = True
            self.file = os.path.join(setting['save_directory'], self.modelo, f'{datetime.datetime.fromtimestamp(time.time()).strftime("%Y.%m.%d_%H.%M.%S")}_{self.modelo}.mp4')
            try:
                session = livestreamer.Livestreamer()
                streams = session.streams(f'hlsvariant://{isOnline}')
                stream = streams['best']
                fd = stream.open()
                if not isModelInListofObjects(self.modelo, recording):
                    os.makedirs(os.path.join(setting['save_directory'], self.modelo), exist_ok=True)
                    with open(self.file, 'wb') as f:
                        self.lock.acquire()
                        recording.append(self)
                        for index, hilo in enumerate(hilos):
                            if hilo.modelo == self.modelo:
                                del hilos[index]
                                break
                        self.lock.release()
                        while not (self._stopevent.isSet() or os.fstat(f.fileno()).st_nlink == 0):
                            try:
                                data = fd.read(1024)
                                f.write(data)
                            except:
                                fd.close()
                                break
                    if setting['postProcessingCommand']:
                            processingQueue.put({'model': self.modelo, 'path': self.file})
            except Exception as e:
                with open('log.log', 'a+') as f:
                    f.write(f'\n{datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")} EXCEPTION: {e}\n')
                self.stop()
            finally:
                self.exceptionHandler()

    def exceptionHandler(self):
        self.stop()
        self.online = False
        self.lock.acquire()
        for index, hilo in enumerate(recording):
            if hilo.modelo == self.modelo:
                del recording[index]
                break
        self.lock.release()
        try:
            file = os.path.join(os.getcwd(), self.file)
            if os.path.isfile(file):
                if os.path.getsize(file) <= 1024:
                    os.remove(file)
        except Exception as e:
            with open('log.log', 'a+') as f:
                f.write(f'\n{datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")} EXCEPTION: {e}\n')

    def isOnline(self):
        try:
            resp = requests.get(f'https://www.cam4.com/rest/v1.0/profile/{self.modelo}/streamInfo')
            hls_url = ''
            if 'cdnURL' in resp.json():
                hls_url = resp.json()['cdnURL']
            if len(hls_url):
                return hls_url
            else:
                return False
        except:
            return False

    def stop(self):
        self._stopevent.set()

class CleaningThread(threading.Thread):
    def __init__(self):
        super().__init__()
        self.interval = 0
        self.lock = threading.Lock()

    def run(self):
        global hilos, recording
        while True:
            self.lock.acquire()
            new_hilos = []
            for hilo in hilos:
                if hilo.is_alive() or hilo.online:
                    new_hilos.append(hilo)
            hilos = new_hilos
            self.lock.release()
            for i in range(10, 0, -1):
                self.interval = i
                time.sleep(1)

class AddModelsThread(threading.Thread):
    def __init__(self):
        super().__init__()
        self.wanted = []
        self.lock = threading.Lock()
        self.repeatedModels = []
        self.counterModel = 0

    def run(self):
        global hilos, recording
        lines = open(setting['wishlist'], 'r').read().splitlines()
        self.wanted = (x for x in lines if x)
        self.lock.acquire()
        aux = []
        for model in self.wanted:
            model = model.lower()
            if model in aux:
                self.repeatedModels.append(model)
            else:
                aux.append(model)
                self.counterModel = self.counterModel + 1
                if not isModelInListofObjects(model, hilos) and not isModelInListofObjects(model, recording):
                    thread = Modelo(model)
                    thread.start()
                    hilos.append(thread)
        for hilo in recording:
            if hilo.modelo not in aux:
                hilo.stop()
        self.lock.release()

def isModelInListofObjects(obj, lista):
    result = False
    for i in lista:
        if i.modelo == obj:
            result = True
            break
    return result

if __name__ == '__main__':
    readConfig()
    if setting['postProcessingCommand']:
        processingQueue = queue.Queue()
        postprocessingWorkers = []
        for i in range(0, setting['postProcessingThreads']):
            t = threading.Thread(target=postProcess)
            postprocessingWorkers.append(t)
            t.start()
    cleaningThread = CleaningThread()
    cleaningThread.start()
    while True:
        try:
            readConfig()
            addModelsThread = AddModelsThread()
            addModelsThread.start()
            i = 1
            for i in range(setting['interval'], 0, -1):
                cls()
                if len(addModelsThread.repeatedModels): print('The following models are more than once in wanted: [\'' + ', '.join(modelo for modelo in addModelsThread.repeatedModels) + '\']')
                print(f'{len(hilos):02d} alive Threads (1 Thread per non-recording model), cleaning dead/not-online Threads in {cleaningThread.interval:02d} seconds, {addModelsThread.counterModel:02d} models in wanted')
                print(f'Online Threads (models): {len(recording):02d}')
                print('The following models are being recorded:')
                for hiloModelo in recording: print(f'  Model: {hiloModelo.modelo}  -->  File: {os.path.basename(hiloModelo.file)}')
                print(f'Next check in {i:02d} seconds\r', end='')
                time.sleep(1)
            addModelsThread.join()
            del addModelsThread, i
        except:
            break

Hi @ahsand97 Thanks for the updated script, would like to know that if you can confirm that the postProcessingCommand can work? It seems all script is working but not the postProcessingCommand, I set postProcessingCommand=/path/to/my/bash.sh but the postscript never been processed and there is no error or log, I am so lost in this. If you could point me in a direction will be appreciated. Thanks,

ahsand97 commented 3 years ago

Hi @ahsand97 Thanks for the updated script, would like to know that if you can confirm that the postProcessingCommand can work? It seems all script is working but not the postProcessingCommand, I set postProcessingCommand=/path/to/my/bash.sh but the postscript never been processed and there is no error or log, I am so lost in this. If you could point me in a direction will be appreciated. Thanks,

I'm not quite sure to be completely honest, I didn't touch any code related with the postProcessingcommand, was it working with the older script?

thelittlerocket commented 3 years ago

@ahsand97 cam4 updated site

Hi @ahsand97 Thanks for the updated script, would like to know that if you can confirm that the postProcessingCommand can work? It seems all script is working but not the postProcessingCommand, I set postProcessingCommand=/path/to/my/bash.sh but the postscript never been processed and there is no error or log, I am so lost in this. If you could point me in a direction will be appreciated. Thanks,

I'm not quite sure to be completely honest, I didn't touch any code related with the postProcessingcommand, was it working with the older script?

Hi @ahsand97 Yes, it was working with the old script, still working on making it work.