cfahlgren1 / observers

A Lightweight Library for AI Observability
190 stars 20 forks source link

[FEAT] store info to `duckdb` and sync to other stores with efficient delay #12

Open davidberenstein1957 opened 1 week ago

davidberenstein1957 commented 1 week ago

Ideally we want to first store the info to duckdb locally and then sync it periodically to other stores to be more fault-tolerant and scalable.

cfahlgren1 commented 5 days ago

Would be nice if it could work something like this.

from observers.observers.models.openai import wrap_openai
from observers.stores import DuckDBStore
from observers.syncs import DatasetsSync
from openai import OpenAI

store = DuckDBStore()
sync = DatasetsSync(repo_name="helloworld", store=store, private=True)

openai_client = OpenAI()
client = wrap_openai(openai_client, store=store)

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Tell me a joke."}],
)

print(response.choices[0].message.content)

sync.sync() # manually sync since it's short-lived

However something to think about is that CommitScheduler inside DatasetsSync uses it's own scheduler and we would have to somehow run a scheduler that can export duckdb unsynced data before the commit scheduler. Unless we would manually sync along with the existing commit scheduler.

cfahlgren1 commented 5 days ago

In that case I wonder if it's just better to have the DatasetStore as is and not introduce complexity? 🤔

davidberenstein1957 commented 5 days ago

@cfahlgren1 I agree. I wouldn't use another class to handle DatasetsSync. I think it would be great to store the info in DuckDB as default, and whenever we have another store, we would send batched versions to that stores. It is fine for the current implementation for Datasets, but Argilla is rather slow if you upload per record. Perhaps we can for now just create a basic ArgillaScheduler that queues records and sends them after to deal with this. I think an atexit instead of the sync.sync() might be nice?.

davidberenstein1957 commented 2 days ago
from observers.stores DatasetsStore, DuckdbStore, sync_stores

existing_store = DuckdbStore()
new_store = DatasetsStore()
sync_stores(from=existing_store, to=new_store)
cfahlgren1 commented 2 days ago

Hmm, I was thinking that somehow if DatasetsStore had the same path as DuckDBStore it would use that DuckDB file automatically which would achieve the same result (use that duckdb db and sync to datasets) without the need for sync_stores?

what do you think? @davidberenstein1957