OpenAgentsInc / queenbee

GPUtopia ai fan out hub, sends requests to workers
GNU Affero General Public License v3.0
11 stars 6 forks source link

Worker persistent stats #9

Closed BitcoinCoderBob closed 1 year ago

BitcoinCoderBob commented 1 year ago

Please let me know if this is on the right track before I fill out the rest of the db calls needed to update rows

earonesty commented 1 year ago

i see you really want to break out by bin, which is fine:

if this was the base class, it would be trivial to write a driver, and the caller would not need row ids and the

class StatsStore:
    def __init__(self):
        self.update_queue = UniqueQueue(key=lambda el: el[0])
        self.update_thread = Thread(daemon=True, target=self.update_loop)
        self.update_thread.start()

    def stop(self, join=True):
        self.update_queue.put([None, None])
        if join:
            self.update_thread.join()

    def update_loop(self):
        while True:
            try:
                key, bnum, val = self.update_queue.get(timeout=4)
                if key is None:
                    break
                self._update(key, bnum, val)
            except Empty:
                pass

    def update(self, key: str, bin_num: int, val: dict):
        self.update_queue.put([key, bin, val])

    def _update(self, key, bin_num: int, val):
        raise NotImplementedError

    def get(self, key: str) -> dict:
        raise NotImplementedError
earonesty commented 1 year ago
class MySqlStatsStore(StatsStore):
    def _update(self, key: str, bin_num: int, val):
        ...

    def get(self, key: str) -> dict:
        ...
earonesty commented 1 year ago
    def bump(self, key, msize, usage, secs):
        msize_bin = StatsWorker.get_bin_from_msize(msize)
        key = self.key_func(key)
        self.worker_stats[key].bump(msize, usage, secs)

        if self.store:
            if isinstance(key, str) and "<" not in key:
                self.store.update(key, msize_bin, self.worker_stats[key].dump())

        self.all.bump(msize, usage, secs)

        if self.store:
            self.store.update(self.ALL_KEY, msize_bin, self.worker_stats[key].dump())
earonesty commented 1 year ago

this results in minimal cross-talk between "storage" specialization and the statistics collection.

no need for "row id" loading into subclasses

BitcoinCoderBob commented 1 year ago

happy to squash all commits at the end