Chainlit / literalai-python

https://docs.getliteral.ai/
Apache License 2.0
12 stars 0 forks source link

fix(event-processor): flush doesn't wait for the internal batch to be empty #70

Closed Dam-Buty closed 5 months ago

Dam-Buty commented 5 months ago

In the event processor's main loop, events are popped from the queue to a local batch variable. However the flush method only checks the queue before exiting, not the batch variable (which isn't in its scope anyway). This was highly timing-dependent, and could allow up to 4 ( batch_size - 1) forgotten events.

The fix consists of changing batch from a local variable to an instance attribute, and checking that attribute in the flush method.

linear[bot] commented 5 months ago
ENG-1323 🐛 Should be Run Step

## Description Should be a Run step instead of a un unknown one. [image.png](https://uploads.linear.app/6a41a374-76e1-40c2-a405-51050761590c/8518d1f3-779b-490f-8b2e-a09237244b06/084a84d4-81a4-47db-a963-423268c56c02) ## Steps to reproduce API keys ``` # from literalai import LiteralClient # from dotenv import load_dotenv # load_dotenv() # literal_client = LiteralClient() # with literal_client.thread(name="hello") as thread: # user_query = "Is this a healthy meal?" # user_image = "https://www.eatthis.com/wp-content/uploads/sites/4/2021/05/healthy-plate.jpg" # user_step = literal_client.message(content=user_query, type="user_message", name="User") # def encode_image(image_url): # import requests # response = requests.get(image_url) # return response.content # literal_client.api.create_attachment( # thread_id = thread.id, # step_id = user_step.id, # name = "meal_image", # content = encode_image(user_image) # ) import os from literalai import LiteralClient from openai import OpenAI import base64 import requests import time from dotenv import load_dotenv load_dotenv() openai_client = OpenAI() literal_client = LiteralClient() literal_client.instrument_openai() literal_client.step(type="run") def generate_answer(user_query, image_url): completion = openai_client.chat.completions.create( model="gpt-4-vision-preview", messages=[ { "role": "user", "content": [ {"type": "text", "text": user_query}, { "type": "image_url", "image_url": {"url": image_url}, }, ], }, ], max_tokens=300, ) literal_client.flush() time.sleep(1) literal_client.message(content=completion.choices[0].message.content, type="assistant_message", name="My Assistant") def encode_image(url): return base64.b64encode(requests.get(url).content) def main(): with literal_client.thread(name="Meal Analyzer") as thread: welcome_message = "Welcome to the meal analyzer, please upload an image of your plate!" literal_client.message(content=welcome_message, type="assistant_message", name="My Assistant") user_query = "Is this a healthy meal?" user_image = "https://www.eatthis.com/wp-content/uploads/sites/4/2021/05/healthy-plate.jpg" user_step = literal_client.message(content=user_query, type="user_message", name="User") literal_client.flush() time.sleep(1) literal_client.api.create_attachment( thread_id = thread.id, step_id = user_step.id, name = "meal_image", content = encode_image(user_image) ) literal_client.flush() time.sleep(1) answer = generate_answer(user_query, user_image) main() # Network requests by the SDK are performed asynchronously. # Invoke flush_and_stop() to guarantee the completion of all requests prior to the process termination. # WARNING: If you run a continuous server, you should not use this method. # literal_client.flush_and_stop() ```