JulianKropp / stream_pipeline

Modular pipeline for processing data streams and measuring each step
MIT License
0 stars 0 forks source link

Create own thread pool #13

Open JulianKropp opened 1 month ago

JulianKropp commented 1 month ago
JulianKropp commented 1 month ago
import threading
import queue
from concurrent.futures import Future

class DynamicThreadPoolExecutor:
    def __init__(self, max_workers):
        self.max_workers = max_workers
        self.tasks = queue.Queue()
        self.workers = []
        self.lock = threading.Lock()
        self.shutdown_flag = False
        self._create_workers(max_workers)

    def _create_workers(self, num_workers):
        for _ in range(num_workers):
            worker = threading.Thread(target=self._worker)
            worker.start()
            self.workers.append(worker)

    def _worker(self):
        while not self.shutdown_flag:
            try:
                fn, args, kwargs, future = self.tasks.get(timeout=1)
                if future.set_running_or_notify_cancel():
                    try:
                        result = fn(*args, **kwargs)
                        future.set_result(result)
                    except Exception as e:
                        future.set_exception(e)
                self.tasks.task_done()
            except queue.Empty:
                continue

    def submit(self, fn, *args, **kwargs):
        if self.shutdown_flag:
            raise RuntimeError("Cannot submit task after shutdown")
        future = Future()
        self.tasks.put((fn, args, kwargs, future))
        return future

    def set_max_workers(self, new_max_workers):
        with self.lock:
            if new_max_workers > self.max_workers:
                self._create_workers(new_max_workers - self.max_workers)
            elif new_max_workers < self.max_workers:
                for _ in range(self.max_workers - new_max_workers):
                    self.tasks.put((self._terminate_worker, (), {}, Future()))
            self.max_workers = new_max_workers

    def _terminate_worker(self):
        self.shutdown_flag = True

    def shutdown(self, wait=True):
        self.shutdown_flag = True
        if wait:
            for worker in self.workers:
                worker.join()

# Usage example
import time

def example_task(n):
    time.sleep(n)
    return n

if __name__ == '__main__':
    executor = DynamicThreadPoolExecutor(max_workers=2)
    futures = [executor.submit(example_task, i) for i in range(4)]

    time.sleep(5)
    executor.set_max_workers(4)

    for future in futures:
        print(future.result())

    executor.shutdown()