Closed evgenii-moriakhin closed 5 months ago
The problem occurs because multiple pubsub daemons are created. The solution is as follows
class PubSubMultiplexer:
def __init__(self, pubsub: PubSub, prefix: str):
...
self._daemon_creation_lock = asyncio.Lock()
async def start(self) -> None:
async with self._daemon_creation_lock:
if not self._daemon_task:
await self.pubsub.psubscribe(f"{self.prefix}*")
self._daemon_task = asyncio.create_task(self._daemon())
@barakalon
Whoops - yeah, that looks like a good solution.
@evgenii-moriakhin do you want to contribute this fix? I'm happy to if you don't want to.
Also, check out Queue.map. Might be a better method for what you're trying to do.
Yes, I'd be glad to publish my first pr
I'm trying to setup saq for my work needs, and I've encountered this problem:
If try to execute asyncio.gather for job.refresh() with until_complete set, an error is raised
here's the code that reproduces the problem