epics-base / p4p

Python bindings for the PVAccess network client and server.
BSD 3-Clause "New" or "Revised" License
27 stars 38 forks source link

Allow asynchronous iteration of `Subscription` #127

Closed jembishop closed 7 months ago

jembishop commented 1 year ago

It would be more idiomatic to implement async iteration for the subscription object when the client is async. This would allow writing such code as:

async def set(ctx: Context, pv: str, readback: str, setpoint: float):
    # Set a pv value, and wait until the readback matches the setpoint up to a tolerance
    await ctx.set(pv)
    with ctx.monitor(readback) as sub:
      async for new_value in sub:
          if (new_value - setpoint).abs() < 1e-3:
              return

I can contribute this if you are happy with that.

mdavidsaver commented 1 year ago

I agree that async iteration would be useful convince. I have thought a little about something like this, but I am left uncertain about how to handle the edge cases. eg. when the first subscription non-error update should be handled specially. And of course the annoying practical question of communicating Disconnected, which can happen multiple times during the life of a subscription.

eg. in your example. What if the readback is initial disconnected? What if it becomes disconnected before "settling"?

I can contribute this if you are happy with that.

I would be happy to see a more concrete example of what you have in mind.

jembishop commented 1 year ago

Thanks for your response. I would imagine that to communicate the disconnect, you would include these events in the async stream. eg to revise my example it would be:

async def set(ctx: Context, pv: str, readback: str, setpoint: float):
    # Set a pv value, and wait until the readback matches the setpoint up to a tolerance
    await ctx.set(pv)
    with ctx.monitor(readback) as sub:
      async for result in sub:
          if isinstance(result, p4p.Value):
            if (new_value - setpoint).abs() < 1e-3:
                return
          elif isinstance(p4p.Disconnected):
                print("Its disconnected")

Not sure exactly what you are refering to with your other point regarding handling the first update specially, but the first event can be extracted specially using this interface with:


 await anext(sub)```
rosstitmarsh commented 1 year ago

A surprisingly minimal working example.

import asyncio

from asyncio import Queue, get_running_loop
from functools import partial

from p4p.client.asyncio import Context as AsyncioContext, Subscription

class Context(AsyncioContext):
    def monitor_async(
        self, name, request=None, notify_disconnect=False
    ) -> "AsyncSubscription":
        sub = AsyncSubscription(name, notify_disconnect=notify_disconnect)
        cb = partial(get_running_loop().call_soon_threadsafe, sub._E.set)
        sub._S = super(AsyncioContext, self).monitor(name, cb, request)
        return sub

class AsyncSubscription(Subscription):
    def __init__(self, name, notify_disconnect=False):
        self._queue = Queue()

        async def cb(V):
            self._queue.put_nowait(V)

        super().__init__(name, cb, notify_disconnect=notify_disconnect)

    def __aiter__(self):
        return self

    async def __anext__(self):
        return await self._queue.get()

async def main():
    context = Context("pva")
    with context.monitor_async("ross:epm1:horizontal:position") as sub:
        async for value in sub:
            print(value)

if __name__ == "__main__":
    asyncio.run(main())
mdavidsaver commented 10 months ago

A surprisingly minimal working example.

There is as ever some trouble in the details. @rosstitmarsh an added Queue needs to be bounded in size, with proper overflow behavior, or a user application will be unbounded in memory and latency when unable to keep up (eg. when the time to process each update exceeds to interval between updates).

A PVA and CA subscription queue conventionally will squash/discard the previous most recent update when at capacity. This bounds queue depth while ensuring that the most recent update remains in the queue when/if the client catches up. ("squash" means combining two deltas)

This overflow behavior is provided by the queue in the underlying PVXS Subscription object. So I think the best path forward is to avoid adding another queue and make use of the p4p.client.raw API directly. In the raw API, a subscription callback function has a different meaning, when the internal subscription queue becomes not-empty.

I hope the following pseudo-code will give some idea of what I am describing:


import asyncio
from . import raw

class Subscription:
    _sub: raw.Subscription
    _notempty: asyncio.Event

    async def __anext__(self):
        while True:
            # first poll() the Subscription queue.  No empty callback will be (re)delivered until a None is "seen"
            E = self._sub.pop()
            if E is None:
                await self._notempty.wait() # assumes only one consumer per Subscription
            else:
                return E

class Context(raw.Context):
    def monitor_async(...):
        R = Subscription(name, cb, notify_disconnect=notify_disconnect)
        cb = partial(get_running_loop().call_soon_threadsafe, R._notempty.set)

        # calling raw.Context.monitor()
        R._S = super(Context, self).monitor(name, cb, request)
        return R
jembishop commented 10 months ago

@mdavidsaver This is possible with say Queue(maxsize=1024) no? And use put instead of put_nowait. It should block the coroutine until the queue is free.