total=len(self.callable_tasks),
)
def get_new_data(self, key):
data = self.redis.get(key)
if data is None:
self.logger.debug(f"[yellow]Could not get value from key: '{key}'")
return None
# TODO: This shouldn't be returning invalid JSON?
# Not sure why but every 8th poll returns a value that isn't None, but isn't JSON.
# Also, JSONDecodeError is actually thrown as ValueError. Which is weird
# labels: Enhancement
try:
data = json.loads(data)
except ValueError:
self.logger.debug(f"[yellow]Could not decode value from key {key}")
return None
if not self.__data_is_new(data):
self.logger.debug(
f"Fetching redis key: '{key}' returned stale data:\n{data}"
)
return None
return data
def __data_is_new(self, data):
# TODO: There has got to be a better way!
# We're storing all the values of every key, once a second in memory.
if data in self.prior_data:
return False
else:
self.prior_data.append(data)
return True
def handle_task_event(self, key):
data = self.get_new_data(key)
if data == None:
return
# Is this one of our tasks, or another queuers?
if self.group_id == data["group_id"]:
This shouldn't be returning invalid JSON?
Not sure why but every 8th poll returns a value that isn't None, but isn't JSON.
Also, JSONDecodeError is actually thrown as ValueError. Which is weird
https://github.com/in03/Resolve-Proxy-Encoder/blob/95843700d66e23e83d9cf82a6e82831abb29753d/resolve_proxy_encoder/app/broker.py#L150
55f8f62de9792ccf236feb237c637de62b90af84