agronholm / anyio

High level asynchronous concurrency and networking framework that works on top of either trio or asyncio
MIT License
1.78k stars 135 forks source link

Fixed `repr` of `MemoryObjectItemReceiver` #767

Closed Danipulok closed 1 month ago

Danipulok commented 1 month ago

Changes

Fix error that sometimes happened on print \ str \ repr on MemoryObjectItemReceiver: AttributeError: 'MemoryObjectItemReceiver' object has no attribute 'item'

Here's the full traceback I receive:

Traceback (most recent call last):
  File "D:\project\test.py", line 305, in stream
    print("Tool tool_stream_sender", str(tool_stream_sender))
                                     ^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\Python311\Lib\dataclasses.py", line 240, in wrapper
    result = user_function(self)
             ^^^^^^^^^^^^^^^^^^^
  File "<string>", line 3, in __repr__
  File "C:\Program Files\Python311\Lib\dataclasses.py", line 240, in wrapper
    result = user_function(self)
             ^^^^^^^^^^^^^^^^^^^
  File "<string>", line 3, in __repr__
  File "C:\Program Files\Python311\Lib\dataclasses.py", line 240, in wrapper
    result = user_function(self)
             ^^^^^^^^^^^^^^^^^^^
  File "<string>", line 3, in __repr__
AttributeError: 'MemoryObjectItemReceiver' object has no attribute 'item'

During handling of the above exception, another exception occurred:

  + Exception Group Traceback (most recent call last):
  |   File "D:\project\mre.py", line 303, in <module>
  |     asyncio.run(main())
  |   File "C:\Program Files\Python311\Lib\asyncio\runners.py", line 190, in run
  |     return runner.run(main)
  |            ^^^^^^^^^^^^^^^^
  |   File "C:\Program Files\Python311\Lib\asyncio\runners.py", line 118, in run
  |     return self._loop.run_until_complete(task)
  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "C:\Program Files\Python311\Lib\asyncio\base_events.py", line 654, in run_until_complete
  |     return future.result()
  |            ^^^^^^^^^^^^^^^
  |   File "D:\project\mre.py", line 296, in main
  |     await check()  # ok
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |   File "D:\project\mre.py", line 281, in check_agent
  |     async for index, chunk in asyncstdlib.enumerate(agent.stream(payload)):
  |   File "D:\project\.venv\Lib\site-packages\asyncstdlib\builtins.py", line 359, in enumerate
  |     async for item in item_iter:
  |   File "D:\project\test.py", line 294, in stream
  |     async with create_task_group() as tg:
  |   File "D:\project\.venv\Lib\site-packages\anyio\_backends\_asyncio.py", line 680, in __aexit__
  |     raise BaseExceptionGroup(
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "D:\project\test.py", line 305, in stream
    |     print("Tool tool_stream_sender", str(tool_stream_sender))
    |                                      ^^^^^^^^^^^^^^^^^^^^^^^
    |   File "C:\Program Files\Python311\Lib\dataclasses.py", line 240, in wrapper
    |     result = user_function(self)
    |              ^^^^^^^^^^^^^^^^^^^
    |   File "<string>", line 3, in __repr__
    |   File "C:\Program Files\Python311\Lib\dataclasses.py", line 240, in wrapper
    |     result = user_function(self)
    |              ^^^^^^^^^^^^^^^^^^^
    |   File "<string>", line 3, in __repr__
    |   File "C:\Program Files\Python311\Lib\dataclasses.py", line 240, in wrapper
    |     result = user_function(self)
    |              ^^^^^^^^^^^^^^^^^^^
    |   File "<string>", line 3, in __repr__
    | AttributeError: 'MemoryObjectItemReceiver' object has no attribute 'item'
    +------------------------------------

Unfortunately, I cannot provide full MRE or code, since it's partially a production code. Here are some of the key moments (IMO):

def get_tool_streams(
    tool_streams: dict[str, tuple[MemoryObjectSendStream[Any], MemoryObjectReceiveStream[Any]]],
    tool_name: str,
) -> tuple[MemoryObjectSendStream[Any], MemoryObjectReceiveStream[Any]]:
    if tool_name not in tool_streams:
        tool_streams[tool_name] = create_memory_object_stream(
            max_buffer_size=math.inf,
        )
    return tool_streams[tool_name]

class Agent:
    async def stream(self):
        async with create_task_group() as tg:
            async for chunk in generic_llm_output_stream:
                    tool_stream_sender, tool_stream_receiver = get_tool_streams(_tool_streams, tool_call.name)
                    print("Tool tool_stream_sender", str(tool_stream_sender))  # AttributeError
                    print("Tool tool_stream_sender", repr(tool_stream_sender))  # AttributeError
                    if tool_call.name not in _started_tools:
                        tg.start_soon(stream_to_task, tool, tool_stream_receiver)
                    tool_stream_sender.send_nowait(tool_call.input)

async def check_agent():
    payload = get_agent_input()
    async for index, chunk in asyncstdlib.enumerate(agent.stream(payload)):
        results_generic.append(chunk)

async def main() -> None:
    await check_agent()

if __name__ == "__main__":
    asyncio.run(main())

Checklist

If this is a user-facing code change, like a bugfix or a new feature, please ensure that you've fulfilled the following conditions (where applicable):

Danipulok commented 1 month ago

Checked locally that with code from PR code does not fail anymore on prints.

Danipulok commented 1 month ago

@agronholm I added the test. Checked it works properly

Danipulok commented 1 month ago

Thanks to you too! Hope I've helped at least a little bit