genai-impact / ecologits

🌱 EcoLogits tracks the energy consumption and environmental footprint of using generative AI models through APIs.
https://ecologits.ai/
Mozilla Public License 2.0
36 stars 3 forks source link

Support for stream and async functions #16

Closed samuelrince closed 2 months ago

samuelrince commented 3 months ago

Description

We currently have no support for "advanced" function for chat completion in async and/or in streaming.

Solution

Maybe we can look at what openllmetry does (again). πŸ˜„

Additional context

Examples for OpenAI SDK:

Streaming:

from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-3.5-turbo",
    messages=[{"role": "user", "content": "Say this is a test"}],
    stream=True,
)
for chunk in stream:
    print(chunk.choices[0].delta.content or "", end="")

Async:

import os
import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(
    # This is the default and can be omitted
    api_key=os.environ.get("OPENAI_API_KEY"),
)

async def main() -> None:
    chat_completion = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": "Say this is a test"}]
    )

asyncio.run(main())

Async + Streaming:

from openai import AsyncOpenAI

client = AsyncOpenAI()

async def main():
    stream = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": "Say this is a test"}],
        stream=True,
    )
    async for chunk in stream:
        print(chunk.choices[0].delta.content or "", end="")

asyncio.run(main())
adrienbanse commented 3 months ago

I can have a look at that πŸ‘

LucBERTON commented 3 months ago

I also started working on async clients. My work is on branch feat/async-tracers

I added tracers for AsyncOpenAI, AsyncAnthropic and MistralAsyncClient

I managed to test properly Anthropic, but I have no free tier for OpenAI and Mistral AI.

It would be nice if someone would test OpenAI and Mistral AI to confirm that their tracers are functionnal and create a VCR for unit tests.

Thanks

samuelrince commented 3 months ago

Thanks @LucBERTON I've had a quick look and fixed a thing for Mistral, now it works. You can run it on your system, I have recorded the tests.

Quick feedback, I think we can merge sync and async tracer for each provider, it would be better to have one module per provider named _tracer.py (and the same applies to instrumentors, one per provider that wraps both sync and async functions).

samuelrince commented 3 months ago

Reopen for stream functions

adrienbanse commented 3 months ago

Thanks @LucBERTON I had very little time to check the streams but it seems that, in openllmetry, they sum the value for each message. All tests for streams contain something like

for _ in stream:
    pass

before actually printing the metrics.

I'll dig deeper soon

LucBERTON commented 3 months ago

I worked a bit on streams with Anthropic sdk.

I tried replicating the same behavior that we have for client.messages.create for client.messages.stream be the response object that we pass to compute_impacts_and_return_response function is completely different for streams (see below).

My commit is on branch feat/stream-tracers if anyone wants to work on it.

Maybe we can plan a peer programming session if anyone is available (ping @samuelrince @adrienbanse)

Here is the response object with message.create :

{'id': 'msg_01DgCLXUnK27P6bYAZspcFqy', 'content': [ContentBlock(text="Hello! It's nice to meet you.", type='text')], 'model': 'claude-3-haiku-20240307', 'role': 'assistant', 'stop_reason': 'end_turn', 'stop_sequence': None, 'type': 'message', 'usage': Usage(input_tokens=10, output_tokens=12)}

Here is the response object with message.stream :

{'_MessageStreamManager__stream': None, '_MessageStreamManager__api_request': functools.partial(<bound method SyncAPIClient.post of <anthropic.Anthropic object at 0x7f13e7ee8220>>, '/v1/messages', body={'max_tokens': 1024, 'messages': [{'role': 'user', 'content': 'Hello'}], 'model': 'claude-3-haiku-20240307', 'metadata': NOT_GIVEN, 'stop_sequences': NOT_GIVEN, 'system': NOT_GIVEN, 'temperature': NOT_GIVEN, 'top_k': NOT_GIVEN, 'top_p': NOT_GIVEN, 'stream': True}, options={'headers': {'X-Stainless-Stream-Helper': 'messages', 'X-Stainless-Custom-Event-Handler': 'false'}}, cast_to=<class 'anthropic.types.message.Message'>, stream=True, stream_cls=<class 'anthropic.lib.streaming._messages.MessageStream'>)}
adrienbanse commented 3 months ago

Yes let's organize a session together @LucBERTON @samuelrince

An idea of solution: we wrap the iterator and each time it's updated, the impacts of MessageStreamManager updates.

Then

c = 0
for _ in stream: 
    if c > 2: 
        break
    c += 1
    pass
print(stream.impacts)

wouldn't output the same as

for _ in stream: 
    pass
print(stream.impacts)

if len(stream) > 2.

I'll work on implementing this. I think it's cleaner than what they do in openllmetry (where they just compute it at the end).

samuelrince commented 3 months ago

If I understand well, you propose a cumulative update for the impacts @adrienbanse? I like that idea! For each chunk, we compute the impacts given the current chunk and all the previous ones.

adrienbanse commented 3 months ago

@samuelrince Exactly, we just "increment" the impacts for each chunk

samuelrince commented 3 months ago

@adrienbanse @LucBERTON I've just pushed an implementation for OpenAI of the stream and async stream. It was not obvious to say the least, but it now works. I let you see if you can reproduce it on Anthropic and Mistral?

PS: I am not that familiar with async python and I discovered that there are some weird behaviors with async generators... There is a lot of duplication, but we cannot work around that afaik, it's due to python limitations / async designs.

adrienbanse commented 2 months ago

Solved by https://github.com/dataforgoodfr/12_genai_impact/pull/28