beanlab / quest_framework

A Python framework for fault-tolerant workflows
0 stars 0 forks source link

Async input #20

Open ABCambridge opened 4 months ago

ABCambridge commented 4 months ago

Another small task to chew on: I want an async version of input (call it ainput). Can it be done? This would be used for creating console UIs for testing quest applications.

byubean commented 4 months ago

https://stackoverflow.com/questions/58454190/python-async-waiting-for-stdin-input-while-doing-other-stuff

Avoid threads.

byubean commented 4 months ago

asyncio.StreamReader

byubean commented 4 months ago
async def _read_stream(stream: asyncio.StreamReader, timeout: float, output_limit: int):
    """
    Reads the stream until the end of the current content
    Stops waiting for content after `timeout` seconds
    Returns decoded content (i.e. str not bytes)
    """
    buffer = []

    while True:
        try:
            token = await asyncio.wait_for(stream.read(1), timeout)
            if not token:
                # stream.read() returns an empty byte when EOF is reached
                break
            token = token.decode()
            buffer.append(token)
            if len(buffer) > output_limit:
                break

        except asyncio.TimeoutError:
            # No bytes have been written for at least `timeout` seconds
            break

    return ''.join(buffer)