beanlab / quest_framework

A Python framework for fault-tolerant workflows
0 stars 0 forks source link

async generator of resource updates #12

Closed byubean closed 3 months ago

byubean commented 5 months ago
identity = 'ASDFASDF'
async for resources in manager.get_resource_stream('workflow_id', identity):
   await socket.send(resources)
byubean commented 4 months ago

Probably need:

byubean commented 4 months ago

Build a test first

kjstanding commented 4 months ago

Dr. Bean's discord comment from 02/08:

async def main():
    workflow = None
    identity = 'foobar'

    # Poll
    while True:
        resources = workflow.get_resources(identity)
        # do something
        await asyncio.sleep(1)

    # Async stream
    async for resources in workflow.stream_resources(identity):
        # respond to the latest resources
        pass

async def stream(identity):
    queue = self._get_resource_queue(identity)
    while True:
        yield await queue.get()

async def other():
    queue.put({'state': 'foo'})
    # stuff
    await asyncio.sleep(1)
    queue.put({'state': 'bar'})

I think the resource Q should only be created if someone is asking for the resource stream. So, when a resources is created, modified, etc, it checks if a queue for the corresponding identity exists, and if it does, puts an event on it. There may need to be multiple Qs per identity (multiple listeners for each identity). So when stream is called for identity, a new queue is created and put in the list of Qs associated with that identity.

The data would look something like:

self.resource_queues = {
  'foobar': [queue1, queue2],
  None: [queue3]  # None represents the global/public identity
}