goodboy / tractor

A distributed, structured concurrent runtime for Python (and friends)
GNU Affero General Public License v3.0
258 stars 12 forks source link

Type annotating `Context.open_stream()` (probably with `msgspec.Struct` subtype(s)) :sunglasses: #365

Open goodboy opened 11 months ago

goodboy commented 11 months ago

After having seen this style in anyio's latest release:

https://github.com/agronholm/anyio/pull/599/files#diff-1fb03ea452beca16c76e364dc7f8ffe3dfcbb20be7e732cc5a552cba32fd771aR104

which it turns out, trio also already supports this :boom: (well via trio_typing), https://github.com/python-trio/trio/blob/348713a7ae73c1afaef79969c90c57e12f4f098b/trio/_channel.py#L93

ANNDDD apparently since it's being proposed more generally for typing functions! https://peps.python.org/pep-0718/

I think doing the same (at least) for our MsgStreams would be super nice (and) as it pertains to eventual plans for static msg typing in #36 and #196 :surfer:!


proto API

typed msging API would probably be something like:

# theoretical API exposing `msgspec.Struct` subtype B)
from tractor.msg import Msg

# only this msg is allowed to be sent from `Context.started()`
class StartupMsg(Msg):  # a `msgspec.Struct` subtype
    allowed_startup_attr: str

# only this msg is allowed to be sent from `Context.send()`
class StreamMsg(Msg):  # a `msgspec.Struct` subtype
    key: str
    value: float

@tractor.context[StartupMsg]  # is this useful?
async def stream_ctx_er_sumthin(

    ctx: tractor.Context[StartupMsg],  # which is better (see above)?

) -> None:
    await ctx.started(StartupMsg(allowed_startup_attr='yeye'))

    async with ctx.open_stream[StreamMsg]() as stream:
        await stream.send(StreamMsg(key='yo', value=69))

        # should error immediately here or on rx end?
        await stream.send(dict(key='yo', value=69))