Open AstraLuma opened 7 years ago
My work-around for this is:
import threading
class ScheduleThread(threading.Thread):
def __init__(self, *pargs, **kwargs):
super().__init__(*pargs, daemon=True, name="scheduler", **kwargs)
import sched
import time
self.sched = sched.scheduler(time.time, time.sleep)
def run(self):
import schedule
import time
while True:
schedule.run_pending()
nextsched = self.sched.run(False)
if nextsched is None:
nextsched = float('inf')
try:
nextschedule = schedule.idle_seconds()
except Exception:
nextschedule = float('inf')
if nextsched == nextschedule == float('inf'):
time.sleep(1) # Finish init
else:
time.sleep(min(nextsched, nextschedule))
def every(self):
import schedule
return schedule.every()
def when(self, time, callable, *pargs, **kwargs):
self.sched.enterabs(time, 0, callable, pargs, kwargs)
def delay(self, delay, callable, *pargs, **kwargs):
self.sched.enter(delay, 0, callable, pargs, kwargs)
schedule = ScheduleThread()
schedule.start()
Which I'm not in love with, but it seems to be working.
EDIT: The latest version of this is https://github.com/astronouth7303/xontrib-schedule/blob/master/xontrib/schedule.py
I know that to do this kind of thing "properly", you need a bunch of other stuff.
To be honest, I'm just hoping to have some stuff added so that I don't need to run both
schedule
andsched
in the same process.This would be really handy for in-memory objects or other resources that are tied to process lifecycle. (My specific use case is remote access credentials that expire. I get the timestamp of exactly when they expire, and I don't care if the process restarts because I just reauthenticate at start.)