traceloop / openllmetry

Open-source observability for your LLM application, based on OpenTelemetry
https://www.traceloop.com/openllmetry
Apache License 2.0
3.5k stars 673 forks source link

πŸ› Bug Report: Support AsyncGenerators in aworkflow #2290

Closed eliasecchig closed 2 weeks ago

eliasecchig commented 2 weeks ago

Which component is this bug for?

Traceloop SDK

πŸ“œ Description

Current aentity method

The current implementation of aentity_method decorator incorrectly handles async generators by attempting to await them through _ahandle_generator, resulting in the error:

TypeError: object async_generator can't be used in 'await' expression

This prevents the decorator from being used with async generator functions, particularly in contexts like FastAPI endpoints where asyncio.run() cannot be used.

πŸ‘Ÿ Reproduction steps

Run the following sample code:

from traceloop.sdk.decorators import workflow, aworkflow, base 
from traceloop.sdk import Instruments, Traceloop
from app.utils.tracing import CloudTraceLoggingSpanExporter
import asyncio

Traceloop.init(
    app_name="Sample",
    disable_batch=False,
    exporter=CloudTraceLoggingSpanExporter(),
    instruments={Instruments.VERTEXAI, Instruments.LANGCHAIN},
)
@aworkflow()
async def stream_numbers():
    for i in range(3):
        yield i
        await asyncio.sleep(0.1)

async def stream_numbers_no_decorator():
    for i in range(3):
        yield i
        await asyncio.sleep(0.1)

# Separate cells for testing each function
# Cell 1 - This should work:
async for num in stream_numbers_no_decorator():
    print(num)

# Cell 2 - This should fail with the TypeError: object async_generator can't be used in 'await' expression
async for num in await stream_numbers():
    print(num)

πŸ‘ Expected behavior

Expected Behavior

The decorator should preserve the async generator and wrap it to handle span lifecycle, allowing for proper streaming of results:

if isinstance(res, types.AsyncGeneratorType):
    async def wrapped_generator():
        try:
            async for item in res:
                yield item
        finally:
            span.end()
            context_api.detach(ctx_token)
    return wrapped_generator()

See this example for full implementation

πŸ‘Ž Actual Behavior with Screenshots

# Current implementation
if isinstance(res, types.AsyncGeneratorType):
    return await _ahandle_generator(span, ctx_token, res)

The decorator attempts to await an async generator, which is not valid Python behavior. I get: TypeError: object async_generator can't be used in 'await' expression

πŸ€– Python Version

No response

πŸ“ƒ Provide any additional context for the Bug.

By overriding the aentity_method I managed to solve all errors and have all the spans properly flowing. See this link

πŸ‘€ Have you spent some time to check if this bug has been raised before?

Are you willing to submit PR?

None

dosubot[bot] commented 2 weeks ago

The issue you're encountering with the aentity_method decorator in the Traceloop SDK is due to the incorrect handling of async generators. The current implementation attempts to await an async generator, which is not valid in Python. To fix this, you can modify the handling of async generators to wrap them in a function that manages the span lifecycle correctly.

Here's a suggested modification based on your expected behavior:

if isinstance(res, types.AsyncGeneratorType):
    async def wrapped_generator():
        try:
            async for item in res:
                yield item
        finally:
            span.end()
            context_api.detach(ctx_token)
    return wrapped_generator()

This approach ensures that the async generator is preserved and the span lifecycle is properly managed, allowing for the correct streaming of results. This modification should resolve the TypeError and allow the decorator to be used with async generator functions, such as in FastAPI endpoints.

To continue talking to Dosu, mention @dosu.

Charlesnorris509 commented 2 weeks ago

The issue with the aentity_method decorator in the Traceloop SDK arises because the decorator currently tries to handle async generators by incorrectly attempting to await them, which is not allowed in Python. Async generators, which are functions that yield results asynchronously, cannot be awaited directlyβ€”they must be iterated over using async for.

Potential Solution To address this, the decorator should:

Detect if the function result (res) is an async generator. If it is, wrap the generator in another async function that: Manages the span lifecycle. Iterates over the items in the async generator using async for. Ensure that the span ends and context detaches once the generator has finished producing items.

@dosubot @eliasecchig. Assign me with the issue so I can create pull request and rectify the issue

eliasecchig commented 2 weeks ago

Don't think I have enough rights to assign the issue sorry!